Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/optimization.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

416 statements  

1from __future__ import annotations 

2 

3import math 

4import numbers 

5from enum import Enum 

6 

7from dask import config, utils 

8from dask._task_spec import GraphNode 

9from dask.core import ( 

10 flatten, 

11 get_dependencies, 

12 ishashable, 

13 istask, 

14 reverse_dict, 

15 subs, 

16 toposort, 

17) 

18 

19 

20def cull(dsk, keys): 

21 """Return new dask with only the tasks required to calculate keys. 

22 

23 In other words, remove unnecessary tasks from dask. 

24 ``keys`` may be a single key or list of keys. 

25 

26 Examples 

27 -------- 

28 >>> def inc(x): 

29 ... return x + 1 

30 

31 >>> def add(x, y): 

32 ... return x + y 

33 

34 >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)} 

35 >>> dsk, dependencies = cull(d, 'out') 

36 >>> dsk # doctest: +ELLIPSIS 

37 {'out': (<function add at ...>, 'x', 10), 'x': 1} 

38 >>> dependencies # doctest: +ELLIPSIS 

39 {'out': ['x'], 'x': []} 

40 

41 Returns 

42 ------- 

43 dsk: culled dask graph 

44 dependencies: Dict mapping {key: [deps]}. Useful side effect to accelerate 

45 other optimizations, notably fuse. 

46 """ 

47 if not isinstance(keys, (list, set)): 

48 keys = [keys] 

49 

50 seen = set() 

51 dependencies = dict() 

52 out = {} 

53 work = list(set(flatten(keys))) 

54 

55 while work: 

56 new_work = [] 

57 for k in work: 

58 dependencies_k = get_dependencies(dsk, k, as_list=True) # fuse needs lists 

59 out[k] = dsk[k] 

60 dependencies[k] = dependencies_k 

61 for d in dependencies_k: 

62 if d not in seen: 

63 seen.add(d) 

64 new_work.append(d) 

65 

66 work = new_work 

67 

68 return out, dependencies 

69 

70 

71def default_fused_linear_keys_renamer(keys): 

72 """Create new keys for fused tasks""" 

73 typ = type(keys[0]) 

74 if typ is str: 

75 names = [utils.key_split(x) for x in keys[:0:-1]] 

76 names.append(keys[0]) 

77 return "-".join(names) 

78 elif typ is tuple and len(keys[0]) > 0 and isinstance(keys[0][0], str): 

79 names = [utils.key_split(x) for x in keys[:0:-1]] 

80 names.append(keys[0][0]) 

81 return ("-".join(names),) + keys[0][1:] 

82 else: 

83 return None 

84 

85 

86def fuse_linear(dsk, keys=None, dependencies=None, rename_keys=True): 

87 """Return new dask graph with linear sequence of tasks fused together. 

88 

89 If specified, the keys in ``keys`` keyword argument are *not* fused. 

90 Supply ``dependencies`` from output of ``cull`` if available to avoid 

91 recomputing dependencies. 

92 

93 **This function is mostly superseded by ``fuse``** 

94 

95 Parameters 

96 ---------- 

97 dsk: dict 

98 keys: list 

99 dependencies: dict, optional 

100 {key: [list-of-keys]}. Must be a list to provide count of each key 

101 This optional input often comes from ``cull`` 

102 rename_keys: bool or func, optional 

103 Whether to rename fused keys with ``default_fused_linear_keys_renamer`` 

104 or not. Renaming fused keys can keep the graph more understandable 

105 and comprehensive, but it comes at the cost of additional processing. 

106 If False, then the top-most key will be used. For advanced usage, a 

107 func is also accepted, ``new_key = rename_keys(fused_key_list)``. 

108 

109 Examples 

110 -------- 

111 >>> def inc(x): 

112 ... return x + 1 

113 

114 >>> def add(x, y): 

115 ... return x + y 

116 

117 >>> d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')} 

118 >>> dsk, dependencies = fuse(d) 

119 >>> dsk # doctest: +SKIP 

120 {'a-b-c': (inc, (inc, 1)), 'c': 'a-b-c'} 

121 >>> dsk, dependencies = fuse(d, rename_keys=False) 

122 >>> dsk # doctest: +ELLIPSIS 

123 {'c': (<function inc at ...>, (<function inc at ...>, 1))} 

124 >>> dsk, dependencies = fuse(d, keys=['b'], rename_keys=False) 

125 >>> dsk # doctest: +ELLIPSIS 

126 {'b': (<function inc at ...>, 1), 'c': (<function inc at ...>, 'b')} 

127 

128 Returns 

129 ------- 

130 dsk: output graph with keys fused 

131 dependencies: dict mapping dependencies after fusion. Useful side effect 

132 to accelerate other downstream optimizations. 

133 """ 

134 if keys is not None and not isinstance(keys, set): 

135 if not isinstance(keys, list): 

136 keys = [keys] 

137 keys = set(flatten(keys)) 

138 

139 if dependencies is None: 

140 dependencies = {k: get_dependencies(dsk, k, as_list=True) for k in dsk} 

141 

142 # locate all members of linear chains 

143 child2parent = {} 

144 unfusible = set() 

145 for parent in dsk: 

146 deps = dependencies[parent] 

147 has_many_children = len(deps) > 1 

148 for child in deps: 

149 if keys is not None and child in keys: 

150 unfusible.add(child) 

151 elif child in child2parent: 

152 del child2parent[child] 

153 unfusible.add(child) 

154 elif has_many_children: 

155 unfusible.add(child) 

156 elif child not in unfusible: 

157 child2parent[child] = parent 

158 

159 # construct the chains from ancestor to descendant 

160 chains = [] 

161 parent2child = dict(map(reversed, child2parent.items())) 

162 while child2parent: 

163 child, parent = child2parent.popitem() 

164 chain = [child, parent] 

165 while parent in child2parent: 

166 parent = child2parent.pop(parent) 

167 del parent2child[parent] 

168 chain.append(parent) 

169 chain.reverse() 

170 while child in parent2child: 

171 child = parent2child.pop(child) 

172 del child2parent[child] 

173 chain.append(child) 

174 chains.append(chain) 

175 

176 dependencies = {k: set(v) for k, v in dependencies.items()} 

177 

178 if rename_keys is True: 

179 key_renamer = default_fused_linear_keys_renamer 

180 elif rename_keys is False: 

181 key_renamer = None 

182 else: 

183 key_renamer = rename_keys 

184 

185 # create a new dask with fused chains 

186 rv = {} 

187 fused = set() 

188 aliases = set() 

189 is_renamed = False 

190 for chain in chains: 

191 if key_renamer is not None: 

192 new_key = key_renamer(chain) 

193 is_renamed = ( 

194 new_key is not None and new_key not in dsk and new_key not in rv 

195 ) 

196 child = chain.pop() 

197 val = dsk[child] 

198 while chain: 

199 parent = chain.pop() 

200 dependencies[parent].update(dependencies.pop(child)) 

201 dependencies[parent].remove(child) 

202 val = subs(dsk[parent], child, val) 

203 fused.add(child) 

204 child = parent 

205 fused.add(child) 

206 if is_renamed: 

207 rv[new_key] = val 

208 rv[child] = new_key 

209 dependencies[new_key] = dependencies[child] 

210 dependencies[child] = {new_key} 

211 aliases.add(child) 

212 else: 

213 rv[child] = val 

214 for key, val in dsk.items(): 

215 if key not in fused: 

216 rv[key] = val 

217 if aliases: 

218 for key, deps in dependencies.items(): 

219 for old_key in deps & aliases: 

220 new_key = rv[old_key] 

221 deps.remove(old_key) 

222 deps.add(new_key) 

223 rv[key] = subs(rv[key], old_key, new_key) 

224 if keys is not None: 

225 for key in aliases - keys: 

226 del rv[key] 

227 del dependencies[key] 

228 return rv, dependencies 

229 

230 

231def _flat_set(x): 

232 if x is None: 

233 return set() 

234 elif isinstance(x, set): 

235 return x 

236 elif not isinstance(x, (list, set)): 

237 x = [x] 

238 return set(x) 

239 

240 

241def inline(dsk, keys=None, inline_constants=True, dependencies=None): 

242 """Return new dask with the given keys inlined with their values. 

243 

244 Inlines all constants if ``inline_constants`` keyword is True. Note that 

245 the constant keys will remain in the graph, to remove them follow 

246 ``inline`` with ``cull``. 

247 

248 Examples 

249 -------- 

250 >>> def inc(x): 

251 ... return x + 1 

252 

253 >>> def add(x, y): 

254 ... return x + y 

255 

256 >>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')} 

257 >>> inline(d) # doctest: +ELLIPSIS 

258 {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')} 

259 

260 >>> inline(d, keys='y') # doctest: +ELLIPSIS 

261 {'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))} 

262 

263 >>> inline(d, keys='y', inline_constants=False) # doctest: +ELLIPSIS 

264 {'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))} 

265 """ 

266 if dependencies and isinstance(next(iter(dependencies.values())), list): 

267 dependencies = {k: set(v) for k, v in dependencies.items()} 

268 

269 keys = _flat_set(keys) 

270 

271 if dependencies is None: 

272 dependencies = {k: get_dependencies(dsk, k) for k in dsk} 

273 

274 if inline_constants: 

275 keys.update( 

276 k 

277 for k, v in dsk.items() 

278 if (ishashable(v) and v in dsk) or (not dependencies[k] and not istask(v)) 

279 ) 

280 

281 # Keys may depend on other keys, so determine replace order with toposort. 

282 # The values stored in `keysubs` do not include other keys. 

283 replaceorder = toposort( 

284 {k: dsk[k] for k in keys if k in dsk}, dependencies=dependencies 

285 ) 

286 keysubs = {} 

287 for key in replaceorder: 

288 val = dsk[key] 

289 for dep in keys & dependencies[key]: 

290 if dep in keysubs: 

291 replace = keysubs[dep] 

292 else: 

293 replace = dsk[dep] 

294 val = subs(val, dep, replace) 

295 keysubs[key] = val 

296 

297 # Make new dask with substitutions 

298 dsk2 = keysubs.copy() 

299 for key, val in dsk.items(): 

300 if key not in dsk2: 

301 for item in keys & dependencies[key]: 

302 val = subs(val, item, keysubs[item]) 

303 dsk2[key] = val 

304 return dsk2 

305 

306 

307def inline_functions( 

308 dsk, output, fast_functions=None, inline_constants=False, dependencies=None 

309): 

310 """Inline cheap functions into larger operations 

311 

312 Examples 

313 -------- 

314 >>> inc = lambda x: x + 1 

315 >>> add = lambda x, y: x + y 

316 >>> double = lambda x: x * 2 

317 >>> dsk = {'out': (add, 'i', 'd'), # doctest: +SKIP 

318 ... 'i': (inc, 'x'), 

319 ... 'd': (double, 'y'), 

320 ... 'x': 1, 'y': 1} 

321 >>> inline_functions(dsk, [], [inc]) # doctest: +SKIP 

322 {'out': (add, (inc, 'x'), 'd'), 

323 'd': (double, 'y'), 

324 'x': 1, 'y': 1} 

325 

326 Protect output keys. In the example below ``i`` is not inlined because it 

327 is marked as an output key. 

328 

329 >>> inline_functions(dsk, ['i', 'out'], [inc, double]) # doctest: +SKIP 

330 {'out': (add, 'i', (double, 'y')), 

331 'i': (inc, 'x'), 

332 'x': 1, 'y': 1} 

333 """ 

334 if not fast_functions: 

335 return dsk 

336 

337 output = set(output) 

338 

339 fast_functions = set(fast_functions) 

340 

341 if dependencies is None: 

342 dependencies = {k: get_dependencies(dsk, k) for k in dsk} 

343 dependents = reverse_dict(dependencies) 

344 

345 def inlinable(key, task): 

346 if ( 

347 not isinstance(task, GraphNode) 

348 and istask(task) 

349 and key not in output 

350 and dependents[key] 

351 ): 

352 try: 

353 if functions_of(task).issubset(fast_functions) and not any( 

354 isinstance(dsk[d], GraphNode) for d in dependents[key] 

355 ): 

356 return True 

357 except TypeError: 

358 pass 

359 return False 

360 

361 keys = [k for k, v in dsk.items() if inlinable(k, v)] 

362 

363 if keys: 

364 dsk = inline( 

365 dsk, keys, inline_constants=inline_constants, dependencies=dependencies 

366 ) 

367 for k in keys: 

368 del dsk[k] 

369 return dsk 

370 

371 

372def unwrap_partial(func): 

373 while hasattr(func, "func"): 

374 func = func.func 

375 return func 

376 

377 

378def functions_of(task): 

379 """Set of functions contained within nested task 

380 

381 Examples 

382 -------- 

383 >>> inc = lambda x: x + 1 

384 >>> add = lambda x, y: x + y 

385 >>> mul = lambda x, y: x * y 

386 >>> task = (add, (mul, 1, 2), (inc, 3)) # doctest: +SKIP 

387 >>> functions_of(task) # doctest: +SKIP 

388 set([add, mul, inc]) 

389 """ 

390 funcs = set() 

391 

392 work = [task] 

393 sequence_types = {list, tuple} 

394 

395 while work: 

396 new_work = [] 

397 for task in work: 

398 if type(task) in sequence_types: 

399 if istask(task): 

400 funcs.add(unwrap_partial(task[0])) 

401 new_work.extend(task[1:]) 

402 else: 

403 new_work.extend(task) 

404 work = new_work 

405 

406 return funcs 

407 

408 

409def default_fused_keys_renamer(keys, max_fused_key_length=120): 

410 """Create new keys for ``fuse`` tasks. 

411 

412 The optional parameter `max_fused_key_length` is used to limit the maximum string length for each renamed key. 

413 If this parameter is set to `None`, there is no limit. 

414 """ 

415 it = reversed(keys) 

416 first_key = next(it) 

417 typ = type(first_key) 

418 

419 if max_fused_key_length: # Take into account size of hash suffix 

420 max_fused_key_length -= 5 

421 

422 def _enforce_max_key_limit(key_name): 

423 if max_fused_key_length and len(key_name) > max_fused_key_length: 

424 name_hash = f"{hash(key_name):x}"[:4] 

425 key_name = f"{key_name[:max_fused_key_length]}-{name_hash}" 

426 return key_name 

427 

428 if typ is str: 

429 first_name = utils.key_split(first_key) 

430 names = {utils.key_split(k) for k in it} 

431 names.discard(first_name) 

432 names = sorted(names) 

433 names.append(first_key) 

434 concatenated_name = "-".join(names) 

435 return _enforce_max_key_limit(concatenated_name) 

436 elif typ is tuple and len(first_key) > 0 and isinstance(first_key[0], str): 

437 first_name = utils.key_split(first_key) 

438 names = {utils.key_split(k) for k in it} 

439 names.discard(first_name) 

440 names = sorted(names) 

441 names.append(first_key[0]) 

442 concatenated_name = "-".join(names) 

443 return (_enforce_max_key_limit(concatenated_name),) + first_key[1:] 

444 

445 

446# PEP-484 compliant singleton constant 

447# https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions 

448class Default(Enum): 

449 token = 0 

450 

451 def __repr__(self) -> str: 

452 return "<default>" 

453 

454 

455_default = Default.token 

456 

457 

458def fuse( 

459 dsk, 

460 keys=None, 

461 dependencies=None, 

462 ave_width=_default, 

463 max_width=_default, 

464 max_height=_default, 

465 max_depth_new_edges=_default, 

466 rename_keys=_default, 

467): 

468 """Fuse tasks that form reductions; more advanced than ``fuse_linear`` 

469 

470 This trades parallelism opportunities for faster scheduling by making tasks 

471 less granular. It can replace ``fuse_linear`` in optimization passes. 

472 

473 This optimization applies to all reductions--tasks that have at most one 

474 dependent--so it may be viewed as fusing "multiple input, single output" 

475 groups of tasks into a single task. There are many parameters to fine 

476 tune the behavior, which are described below. ``ave_width`` is the 

477 natural parameter with which to compare parallelism to granularity, so 

478 it should always be specified. Reasonable values for other parameters 

479 will be determined using ``ave_width`` if necessary. 

480 

481 Parameters 

482 ---------- 

483 dsk: dict 

484 dask graph 

485 keys: list or set, optional 

486 Keys that must remain in the returned dask graph 

487 dependencies: dict, optional 

488 {key: [list-of-keys]}. Must be a list to provide count of each key 

489 This optional input often comes from ``cull`` 

490 ave_width: float (default 1) 

491 Upper limit for ``width = num_nodes / height``, a good measure of 

492 parallelizability. 

493 dask.config key: ``optimization.fuse.ave-width`` 

494 max_width: int (default infinite) 

495 Don't fuse if total width is greater than this. 

496 dask.config key: ``optimization.fuse.max-width`` 

497 max_height: int or None (default None) 

498 Don't fuse more than this many levels. Set to None to dynamically 

499 adjust to ``1.5 + ave_width * log(ave_width + 1)``. 

500 dask.config key: ``optimization.fuse.max-height`` 

501 max_depth_new_edges: int or None (default None) 

502 Don't fuse if new dependencies are added after this many levels. 

503 Set to None to dynamically adjust to ave_width * 1.5. 

504 dask.config key: ``optimization.fuse.max-depth-new-edges`` 

505 rename_keys: bool or func, optional (default True) 

506 Whether to rename the fused keys with ``default_fused_keys_renamer`` 

507 or not. Renaming fused keys can keep the graph more understandable 

508 and comprehensive, but it comes at the cost of additional processing. 

509 If False, then the top-most key will be used. For advanced usage, a 

510 function to create the new name is also accepted. 

511 dask.config key: ``optimization.fuse.rename-keys`` 

512 

513 Returns 

514 ------- 

515 dsk 

516 output graph with keys fused 

517 dependencies 

518 dict mapping dependencies after fusion. Useful side effect to accelerate other 

519 downstream optimizations. 

520 """ 

521 

522 # Perform low-level fusion unless the user has 

523 # specified False explicitly. 

524 if config.get("optimization.fuse.active") is False: 

525 return dsk, dependencies 

526 

527 if keys is not None and not isinstance(keys, set): 

528 if not isinstance(keys, list): 

529 keys = [keys] 

530 keys = set(flatten(keys)) 

531 

532 # Read defaults from dask.yaml and/or user-defined config file 

533 if ave_width is _default: 

534 ave_width = config.get("optimization.fuse.ave-width") 

535 assert ave_width is not _default 

536 if max_height is _default: 

537 max_height = config.get("optimization.fuse.max-height") 

538 assert max_height is not _default 

539 if max_depth_new_edges is _default: 

540 max_depth_new_edges = config.get("optimization.fuse.max-depth-new-edges") 

541 assert max_depth_new_edges is not _default 

542 if max_depth_new_edges is None: 

543 max_depth_new_edges = ave_width * 1.5 

544 if max_width is _default: 

545 max_width = config.get("optimization.fuse.max-width") 

546 assert max_width is not _default 

547 if max_width is None: 

548 max_width = 1.5 + ave_width * math.log(ave_width + 1) 

549 

550 if not ave_width or not max_height: 

551 return dsk, dependencies 

552 

553 if rename_keys is _default: 

554 rename_keys = config.get("optimization.fuse.rename-keys") 

555 assert rename_keys is not _default 

556 if rename_keys is True: 

557 key_renamer = default_fused_keys_renamer 

558 elif rename_keys is False: 

559 key_renamer = None 

560 elif not callable(rename_keys): 

561 raise TypeError("rename_keys must be a boolean or callable") 

562 else: 

563 key_renamer = rename_keys 

564 rename_keys = key_renamer is not None 

565 

566 if dependencies is None: 

567 deps = {k: get_dependencies(dsk, k, as_list=True) for k in dsk} 

568 else: 

569 deps = { 

570 k: v if isinstance(v, list) else get_dependencies(dsk, k, as_list=True) 

571 for k, v in dependencies.items() 

572 } 

573 

574 rdeps = {} 

575 for k, vals in deps.items(): 

576 for v in vals: 

577 if v not in rdeps: 

578 rdeps[v] = [k] 

579 else: 

580 rdeps[v].append(k) 

581 deps[k] = set(vals) 

582 

583 reducible = set() 

584 for k, vals in rdeps.items(): 

585 if ( 

586 len(vals) == 1 

587 and k not in (keys or ()) 

588 and k in dsk 

589 and not isinstance(dsk[k], GraphNode) 

590 and (type(dsk[k]) is tuple or isinstance(dsk[k], (numbers.Number, str))) 

591 and not any(isinstance(dsk[v], GraphNode) for v in vals) 

592 ): 

593 reducible.add(k) 

594 

595 if not reducible and (all(len(set(v)) != 1 for v in rdeps.values())): 

596 # Quick return if there's nothing to do. Only progress if there's tasks 

597 # fusible by the main `fuse` 

598 return dsk, deps 

599 

600 rv = dsk.copy() 

601 fused_trees = {} 

602 # These are the stacks we use to store data as we traverse the graph 

603 info_stack = [] 

604 children_stack = [] 

605 # For speed 

606 deps_pop = deps.pop 

607 reducible_add = reducible.add 

608 reducible_pop = reducible.pop 

609 reducible_remove = reducible.remove 

610 fused_trees_pop = fused_trees.pop 

611 info_stack_append = info_stack.append 

612 info_stack_pop = info_stack.pop 

613 children_stack_append = children_stack.append 

614 children_stack_extend = children_stack.extend 

615 children_stack_pop = children_stack.pop 

616 while reducible: 

617 parent = reducible_pop() 

618 reducible_add(parent) 

619 while parent in reducible: 

620 # Go to the top 

621 parent = rdeps[parent][0] 

622 children_stack_append(parent) 

623 children_stack_extend(reducible & deps[parent]) 

624 while True: 

625 child = children_stack[-1] 

626 if child != parent: 

627 children = reducible & deps[child] 

628 while children: 

629 # Depth-first search 

630 children_stack_extend(children) 

631 parent = child 

632 child = children_stack[-1] 

633 children = reducible & deps[child] 

634 children_stack_pop() 

635 # This is a leaf node in the reduction region 

636 # key, task, fused_keys, height, width, number of nodes, fudge, set of edges 

637 info_stack_append( 

638 ( 

639 child, 

640 rv[child], 

641 [child] if rename_keys else None, 

642 1, 

643 1, 

644 1, 

645 0, 

646 deps[child] - reducible, 

647 ) 

648 ) 

649 else: 

650 children_stack_pop() 

651 # Calculate metrics and fuse as appropriate 

652 deps_parent = deps[parent] 

653 edges = deps_parent - reducible 

654 children = deps_parent - edges 

655 num_children = len(children) 

656 

657 if num_children == 1: 

658 ( 

659 child_key, 

660 child_task, 

661 child_keys, 

662 height, 

663 width, 

664 num_nodes, 

665 fudge, 

666 children_edges, 

667 ) = info_stack_pop() 

668 num_children_edges = len(children_edges) 

669 

670 if fudge > num_children_edges - 1 >= 0: 

671 fudge = num_children_edges - 1 

672 edges |= children_edges 

673 no_new_edges = len(edges) == num_children_edges 

674 if not no_new_edges: 

675 fudge += 1 

676 if ( 

677 (num_nodes + fudge) / height <= ave_width 

678 and 

679 # Sanity check; don't go too deep if new levels introduce new edge dependencies 

680 (no_new_edges or height < max_depth_new_edges) 

681 and ( 

682 not isinstance(dsk[parent], GraphNode) 

683 # TODO: substitute can be implemented with GraphNode.inline 

684 # or isinstance(dsk[child_key], GraphNode) 

685 ) 

686 ): 

687 # Perform substitutions as we go 

688 val = subs(dsk[parent], child_key, child_task) 

689 deps_parent.remove(child_key) 

690 deps_parent |= deps_pop(child_key) 

691 del rv[child_key] 

692 reducible_remove(child_key) 

693 if rename_keys: 

694 child_keys.append(parent) 

695 fused_trees[parent] = child_keys 

696 fused_trees_pop(child_key, None) 

697 

698 if children_stack: 

699 if no_new_edges: 

700 # Linear fuse 

701 info_stack_append( 

702 ( 

703 parent, 

704 val, 

705 child_keys, 

706 height, 

707 width, 

708 num_nodes, 

709 fudge, 

710 edges, 

711 ) 

712 ) 

713 else: 

714 info_stack_append( 

715 ( 

716 parent, 

717 val, 

718 child_keys, 

719 height + 1, 

720 width, 

721 num_nodes + 1, 

722 fudge, 

723 edges, 

724 ) 

725 ) 

726 else: 

727 rv[parent] = val 

728 break 

729 else: 

730 rv[child_key] = child_task 

731 reducible_remove(child_key) 

732 if children_stack: 

733 # Allow the parent to be fused, but only under strict circumstances. 

734 # Ensure that linear chains may still be fused. 

735 if fudge > int(ave_width - 1): 

736 fudge = int(ave_width - 1) 

737 # This task *implicitly* depends on `edges` 

738 info_stack_append( 

739 ( 

740 parent, 

741 rv[parent], 

742 [parent] if rename_keys else None, 

743 1, 

744 width, 

745 1, 

746 fudge, 

747 edges, 

748 ) 

749 ) 

750 else: 

751 break 

752 else: 

753 child_keys = [] 

754 height = 1 

755 width = 0 

756 num_single_nodes = 0 

757 num_nodes = 0 

758 fudge = 0 

759 children_edges = set() 

760 max_num_edges = 0 

761 children_info = info_stack[-num_children:] 

762 del info_stack[-num_children:] 

763 for ( 

764 _, 

765 _, 

766 _, 

767 cur_height, 

768 cur_width, 

769 cur_num_nodes, 

770 cur_fudge, 

771 cur_edges, 

772 ) in children_info: 

773 if cur_height == 1: 

774 num_single_nodes += 1 

775 elif cur_height > height: 

776 height = cur_height 

777 width += cur_width 

778 num_nodes += cur_num_nodes 

779 fudge += cur_fudge 

780 if len(cur_edges) > max_num_edges: 

781 max_num_edges = len(cur_edges) 

782 children_edges |= cur_edges 

783 # Fudge factor to account for possible parallelism with the boundaries 

784 num_children_edges = len(children_edges) 

785 fudge += min( 

786 num_children - 1, max(0, num_children_edges - max_num_edges) 

787 ) 

788 

789 if fudge > num_children_edges - 1 >= 0: 

790 fudge = num_children_edges - 1 

791 edges |= children_edges 

792 no_new_edges = len(edges) == num_children_edges 

793 if not no_new_edges: 

794 fudge += 1 

795 if ( 

796 (num_nodes + fudge) / height <= ave_width 

797 and num_single_nodes <= ave_width 

798 and width <= max_width 

799 and height <= max_height 

800 and 

801 # Sanity check; don't go too deep if new levels introduce new edge dependencies 

802 (no_new_edges or height < max_depth_new_edges) 

803 and ( 

804 not isinstance(dsk[parent], GraphNode) 

805 and not any( 

806 isinstance(dsk[child_key], GraphNode) 

807 for child_key in children 

808 ) 

809 # TODO: substitute can be implemented with GraphNode.inline 

810 # or all( 

811 # isintance(dsk[child], GraphNode) for child in children 

812 ) 

813 ): 

814 # Perform substitutions as we go 

815 val = dsk[parent] 

816 children_deps = set() 

817 for child_info in children_info: 

818 cur_child = child_info[0] 

819 val = subs(val, cur_child, child_info[1]) 

820 del rv[cur_child] 

821 children_deps |= deps_pop(cur_child) 

822 reducible_remove(cur_child) 

823 if rename_keys: 

824 fused_trees_pop(cur_child, None) 

825 child_keys.extend(child_info[2]) 

826 deps_parent -= children 

827 deps_parent |= children_deps 

828 

829 if rename_keys: 

830 child_keys.append(parent) 

831 fused_trees[parent] = child_keys 

832 

833 if children_stack: 

834 info_stack_append( 

835 ( 

836 parent, 

837 val, 

838 child_keys, 

839 height + 1, 

840 width, 

841 num_nodes + 1, 

842 fudge, 

843 edges, 

844 ) 

845 ) 

846 else: 

847 rv[parent] = val 

848 break 

849 else: 

850 for child_info in children_info: 

851 rv[child_info[0]] = child_info[1] 

852 reducible_remove(child_info[0]) 

853 if children_stack: 

854 # Allow the parent to be fused, but only under strict circumstances. 

855 # Ensure that linear chains may still be fused. 

856 if width > max_width: 

857 width = max_width 

858 if fudge > int(ave_width - 1): 

859 fudge = int(ave_width - 1) 

860 # key, task, height, width, number of nodes, fudge, set of edges 

861 # This task *implicitly* depends on `edges` 

862 info_stack_append( 

863 ( 

864 parent, 

865 rv[parent], 

866 [parent] if rename_keys else None, 

867 1, 

868 width, 

869 1, 

870 fudge, 

871 edges, 

872 ) 

873 ) 

874 else: 

875 break 

876 # Traverse upwards 

877 parent = rdeps[parent][0] 

878 

879 if key_renamer: 

880 for root_key, fused_keys in fused_trees.items(): 

881 alias = key_renamer(fused_keys) 

882 if alias is not None and alias not in rv: 

883 rv[alias] = rv[root_key] 

884 rv[root_key] = alias 

885 deps[alias] = deps[root_key] 

886 deps[root_key] = {alias} 

887 

888 return rv, deps