Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/order.py: 5%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

415 statements  

1from __future__ import annotations 

2 

3r""" Static order of nodes in dask graph 

4 

5Dask makes decisions on what tasks to prioritize both 

6 

7* Dynamically at runtime 

8* Statically before runtime 

9 

10Dynamically we prefer to run tasks that were just made available. However when 

11several tasks become available at the same time we have an opportunity to break 

12ties in an intelligent way 

13 

14 d 

15 | 

16 b c 

17 \ / 

18 a 

19 

20For example after we finish ``a`` we can choose to run either ``b`` or ``c`` 

21next. Making small decisions like this can greatly affect our performance, 

22especially because the order in which we run tasks affects the order in which 

23we can release memory, which operationally we find to have a large affect on 

24many computation. We want to run tasks in such a way that we keep only a small 

25amount of data in memory at any given time. 

26 

27 

28Static Ordering 

29--------------- 

30 

31And so we create a total ordering over all nodes to serve as a tie breaker. We 

32represent this ordering with a dictionary mapping keys to integer values. 

33Lower scores have higher priority. These scores correspond to the order in 

34which a sequential scheduler would visit each node. 

35 

36 {'a': 0, 

37 'c': 1, 

38 'd': 2, 

39 'b': 3} 

40 

41There are several ways in which we might order our keys. This is a nuanced 

42process that has to take into account many different kinds of workflows, and 

43operate efficiently in linear time. We strongly recommend that readers look at 

44the docstrings of tests in dask/tests/test_order.py. These tests usually have 

45graph types laid out very carefully to show the kinds of situations that often 

46arise, and the order we would like to be determined. 

47 

48""" 

49from collections import defaultdict, deque, namedtuple 

50from collections.abc import Callable, Iterable, Mapping, MutableMapping 

51from typing import Any, Literal, NamedTuple, overload 

52 

53from dask._task_spec import DataNode, DependenciesMapping 

54from dask.core import get_deps, getcycle, istask, reverse_dict 

55from dask.typing import Key 

56 

57 

58class Order(NamedTuple): 

59 priority: int 

60 critical_path: float | int 

61 

62 

63@overload 

64def order( 

65 dsk: Mapping[Key, Any], 

66 dependencies: Mapping[Key, set[Key]] | None = None, 

67 *, 

68 return_stats: Literal[True], 

69) -> dict[Key, Order]: ... 

70 

71 

72@overload 

73def order( 

74 dsk: Mapping[Key, Any], 

75 dependencies: Mapping[Key, set[Key]] | None = None, 

76 *, 

77 return_stats: Literal[False] = False, 

78) -> dict[Key, int]: ... 

79 

80 

81def order( 

82 dsk: Mapping[Key, Any], 

83 dependencies: Mapping[Key, set[Key]] | None = None, 

84 *, 

85 return_stats: bool = False, 

86) -> dict[Key, Order] | dict[Key, int]: 

87 """Order nodes in dask graph 

88 

89 This produces an ordering over our tasks that we use to break ties when 

90 executing. We do this ahead of time to reduce a bit of stress on the 

91 scheduler and also to assist in static analysis. 

92 

93 This currently traverses the graph as a single-threaded scheduler would 

94 traverse it. 

95 

96 Examples 

97 -------- 

98 >>> inc = lambda x: x + 1 

99 >>> add = lambda x, y: x + y 

100 >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')} 

101 >>> order(dsk) 

102 {'a': 0, 'c': 1, 'b': 2, 'd': 3} 

103 """ 

104 if not dsk: 

105 return {} 

106 

107 dsk = dict(dsk) 

108 

109 dependencies = DependenciesMapping(dsk) 

110 dependents = reverse_dict(dependencies) 

111 

112 external_keys = set() 

113 if len(dependents) != len(dependencies): 

114 # There are external keys. Let's add a DataNode to ensure the algo below 

115 # is encountering a complete graph. Those artificial data nodes will be 

116 # ignored when assigning results 

117 for k in dependents: 

118 if k not in dependencies: 

119 external_keys.add(k) 

120 dsk[k] = DataNode(k, object()) 

121 

122 expected_len = len(dsk) 

123 leaf_nodes = {k for k, v in dependents.items() if not v} 

124 root_nodes = {k for k, v in dependencies.items() if not v} 

125 

126 result: dict[Key, Order | int] = {} 

127 

128 # Normalize the graph by removing leaf nodes that are not actual tasks, see 

129 # for instance da.store where the task is merely an alias 

130 # to multiple keys, i.e. [key1, key2, ...,] 

131 

132 # Similarly, we are removing root nodes that are pure data tasks. Those task 

133 # are embedded in the run_spec of a task and are not runnable. We have to 

134 # assign a priority but their priority has no impact on performance. 

135 # The removal of those tasks typically transforms the graph topology in a 

136 # way that is simpler to handle 

137 all_tasks = False 

138 n_removed_leaves = 0 

139 requires_data_task = defaultdict(set) 

140 

141 while not all_tasks: 

142 all_tasks = True 

143 for leaf in list(leaf_nodes): 

144 if leaf in root_nodes: 

145 continue 

146 if ( 

147 not istask(dsk[leaf]) 

148 # Having a linear chain is fine 

149 and len(dependencies[leaf]) > 1 

150 ): 

151 all_tasks = False 

152 # Put non-tasks at the very end since they are merely aliases 

153 # and have no impact on performance at all 

154 prio = len(dsk) - 1 - n_removed_leaves 

155 if return_stats: 

156 result[leaf] = Order(prio, -1) 

157 else: 

158 result[leaf] = prio 

159 n_removed_leaves += 1 

160 leaf_nodes.remove(leaf) 

161 for dep in dependencies[leaf]: 

162 dependents[dep].remove(leaf) 

163 if not dependents[dep]: 

164 leaf_nodes.add(dep) 

165 del dsk[leaf] 

166 del dependencies[leaf] 

167 del dependents[leaf] 

168 

169 for root in list(root_nodes): 

170 if root in leaf_nodes: 

171 continue 

172 deps_root = dependents[root] 

173 if not istask(dsk[root]) and len(deps_root) > 1: 

174 del dsk[root] 

175 del dependencies[root] 

176 root_nodes.remove(root) 

177 del dependents[root] 

178 for dep in deps_root: 

179 requires_data_task[dep].add(root) 

180 if not dependencies[dep]: 

181 root_nodes.add(dep) 

182 

183 num_needed, total_dependencies = ndependencies(dependencies, dependents) 

184 if len(total_dependencies) != len(dsk): 

185 cycle = getcycle(dsk, None) 

186 raise RuntimeError( 

187 "Cycle detected between the following keys:\n -> %s" 

188 % "\n -> ".join(str(x) for x in cycle) 

189 ) 

190 assert dependencies is not None 

191 roots_connected, max_dependents = _connecting_to_roots(dependencies, dependents) 

192 leafs_connected, _ = _connecting_to_roots(dependents, dependencies) 

193 i = 0 

194 

195 runnable_hull = set() 

196 reachable_hull = set() 

197 

198 runnable: list[Key] = [] 

199 

200 known_runnable_paths: dict[Key, list[list[Key]]] = {} 

201 known_runnable_paths_pop = known_runnable_paths.pop 

202 

203 crit_path_counter = 0 

204 scrit_path: set[Key] = set() 

205 _crit_path_counter_offset: int | float = 0 

206 _sort_keys_cache: dict[Key, tuple[int, int, int, int, str]] = {} 

207 

208 def sort_key(x: Key) -> tuple[int, int, int, int, str]: 

209 try: 

210 return _sort_keys_cache[x] 

211 except KeyError: 

212 assert dependencies is not None 

213 _sort_keys_cache[x] = rv = ( 

214 total_dependencies[x], 

215 len(dependencies[x]), 

216 len(roots_connected[x]), 

217 -max_dependents[x], 

218 # Converting to str is actually faster than creating some 

219 # wrapper class and comparisons that come this far are 

220 # relatively rare so we prefer fast init over fast comparison 

221 str(x), 

222 ) 

223 return rv 

224 

225 def add_to_result(item: Key) -> None: 

226 # Earlier versions recursed into this method but this could cause 

227 # recursion depth errors. This is the only reason for the while loop 

228 next_items = [item] 

229 nonlocal i 

230 nonlocal min_leaf_degree # type: ignore[misc] 

231 while next_items: 

232 item = next_items.pop() 

233 runnable_hull.discard(item) 

234 reachable_hull.discard(item) 

235 leaf_nodes.discard(item) 

236 if item in result: 

237 continue 

238 

239 while requires_data_task[item]: 

240 add_to_result(requires_data_task[item].pop()) 

241 if return_stats: 

242 result[item] = Order(i, crit_path_counter - _crit_path_counter_offset) 

243 else: 

244 result[item] = i 

245 if item not in external_keys: 

246 i += 1 

247 if item in root_nodes: 

248 for leaf in leafs_connected[item]: 

249 if leaf in leaf_nodes: 

250 degree = leafs_degree.pop(leaf) 

251 leafs_per_degree[degree].remove(leaf) 

252 

253 new_degree = degree - 1 

254 if new_degree > 0: 

255 if new_degree < min_leaf_degree: 

256 min_leaf_degree = new_degree 

257 leafs_per_degree[new_degree].add(leaf) 

258 leafs_degree[leaf] = new_degree 

259 elif not leafs_per_degree[degree]: 

260 assert degree == min_leaf_degree 

261 while min_leaf_degree != max_leaf_degree and ( 

262 min_leaf_degree not in leafs_per_degree 

263 or not leafs_per_degree[min_leaf_degree] 

264 ): 

265 min_leaf_degree += 1 

266 

267 # Note: This is a `set` and therefore this introduces a certain 

268 # randomness. However, this randomness should not have any impact on 

269 # the final result since the `process_runnable` should produce 

270 # equivalent results regardless of the order in which runnable is 

271 # populated (not identical but equivalent) 

272 for dep in dependents.get(item, ()): 

273 num_needed[dep] -= 1 

274 reachable_hull.add(dep) 

275 if not num_needed[dep]: 

276 if len(dependents[item]) == 1: 

277 next_items.append(dep) 

278 else: 

279 runnable.append(dep) 

280 

281 def _with_offset(func: Callable[..., None]) -> Callable[..., None]: 

282 # This decorator is only used to reduce indentation levels. The offset 

283 # is purely cosmetical and used for some visualizations and I haven't 

284 # settled on how to implement this best so I didn't want to have large 

285 # indentations that make things harder to read 

286 

287 def wrapper(*args: Any, **kwargs: Any) -> None: 

288 nonlocal _crit_path_counter_offset 

289 _crit_path_counter_offset = 0.5 

290 try: 

291 func(*args, **kwargs) 

292 finally: 

293 _crit_path_counter_offset = 0 

294 

295 return wrapper 

296 

297 @_with_offset 

298 def process_runnables() -> None: 

299 """Compute all currently runnable paths and either cache or execute them 

300 

301 This is designed to ensure we are running tasks that are free to execute 

302 (e.g. the result of a splitter task) not too eagerly. If we executed 

303 such free tasks too early we'd be walking the graph in a too wide / 

304 breadth first fashion that is not optimal. If instead we were to only 

305 execute them once they are needed for a final result, this can cause 

306 very high memory pressure since valuable reducers are executed too 

307 late. 

308 

309 The strategy here is to take all runnable tasks and walk forwards until 

310 we hit a reducer node (i.e. a node with more than one dependency). We 

311 will remember/cache the path to this reducer node. 

312 If this path leads to a leaf or if we find enough runnable paths for a 

313 reducer to be runnable, we will execute the path. 

314 

315 If instead of a reducer a splitter is encountered that is runnable, we 

316 will follow its splitter paths individually and apply the same logic to 

317 each branch. 

318 """ 

319 while runnable: 

320 candidates = runnable.copy() 

321 runnable.clear() 

322 while candidates: 

323 key = candidates.pop() 

324 if key in runnable_hull or key in result: 

325 continue 

326 if key in leaf_nodes: 

327 add_to_result(key) 

328 continue 

329 path = [key] 

330 branches = deque([(0, path)]) 

331 

332 while branches: 

333 nsplits, path = branches.popleft() 

334 while True: 

335 # Loop invariant. Too expensive to compute at runtime 

336 # assert not set(known_runnable_paths).intersection(runnable_hull) 

337 current = path[-1] 

338 runnable_hull.add(current) 

339 deps_downstream = dependents[current] 

340 deps_upstream = dependencies[current] 

341 if not deps_downstream: 

342 # FIXME: The fact that it is possible for 

343 # num_needed[current] == 0 means we're doing some 

344 # work twice 

345 if num_needed[current] <= 1: 

346 for k in path: 

347 add_to_result(k) 

348 else: 

349 runnable_hull.discard(current) 

350 elif len(path) == 1 or len(deps_upstream) == 1: 

351 if len(deps_downstream) > 1: 

352 nsplits += 1 

353 for d in sorted(deps_downstream, key=sort_key): 

354 # This ensures we're only considering splitters 

355 # that are genuinely splitting and not 

356 # interleaving 

357 if len(dependencies[d]) == 1: 

358 branch = path.copy() 

359 branch.append(d) 

360 branches.append((nsplits, branch)) 

361 break 

362 path.extend(deps_downstream) 

363 continue 

364 elif current in known_runnable_paths: 

365 known_runnable_paths[current].append(path) 

366 runnable_hull.discard(current) 

367 if ( 

368 len(known_runnable_paths[current]) 

369 >= num_needed[current] 

370 ): 

371 pruned_branches: deque[list[Key]] = deque() 

372 for path in known_runnable_paths_pop(current): 

373 if path[-2] not in result: 

374 pruned_branches.append(path) 

375 if len(pruned_branches) < num_needed[current]: 

376 known_runnable_paths[current] = list( 

377 pruned_branches 

378 ) 

379 else: 

380 if nsplits > 1: 

381 path = [] 

382 for pruned in pruned_branches: 

383 path.extend(pruned) 

384 branches.append((nsplits - 1, path)) 

385 break 

386 

387 while pruned_branches: 

388 path = pruned_branches.popleft() 

389 for k in path: 

390 if num_needed[k]: 

391 pruned_branches.append(path) 

392 break 

393 add_to_result(k) 

394 elif ( 

395 len(dependencies[current]) > 1 and num_needed[current] <= 1 

396 ): 

397 for k in path: 

398 add_to_result(k) 

399 else: 

400 known_runnable_paths[current] = [path] 

401 runnable_hull.discard(current) 

402 break 

403 

404 # Pick strategy 

405 # Note: We're trying to be smart here by picking a strategy on how to 

406 # determine the critical path. This is not always clear and we may want to 

407 # consider just calculating both orderings and picking the one with less 

408 # pressure. The only concern to this would be performance but at time of 

409 # writing, the most expensive part of ordering is the prep work (mostly 

410 # connected roots + sort_key) which can be reused for multiple orderings. 

411 

412 # Degree in this context is the number root nodes that have to be loaded for 

413 # this leaf to become accessible. Zero means the leaf is already accessible 

414 # in which case it _should_ either already be in result or be accessible via 

415 # process_runnables 

416 # When picking a new target, we prefer the leafs with the least number of 

417 # roots that need loading. 

418 leafs_degree = {} 

419 leafs_per_degree = defaultdict(set) 

420 min_leaf_degree = len(dsk) 

421 max_leaf_degree = len(dsk) 

422 for leaf in leaf_nodes - root_nodes: 

423 degree = len(roots_connected[leaf]) 

424 min_leaf_degree = min(min_leaf_degree, degree) 

425 max_leaf_degree = max(max_leaf_degree, degree) 

426 leafs_degree[leaf] = degree 

427 leafs_per_degree[degree].add(leaf) 

428 

429 def get_target() -> Key: 

430 # If we're already mid run and there is a runnable_hull we'll attempt to 

431 # pick the next target in a way that minimizes the number of additional 

432 # root nodes that are needed 

433 all_leafs_accessible = min_leaf_degree == max_leaf_degree 

434 is_trivial_lookup = not reachable_hull or all_leafs_accessible 

435 if not is_trivial_lookup: 

436 candidates = reachable_hull & leafs_per_degree[min_leaf_degree] 

437 if not candidates: 

438 candidates = leafs_per_degree[min_leaf_degree] 

439 # Even without reachable hull overlap this should be relatively 

440 # small so one full pass should be fine 

441 return min(candidates, key=sort_key) 

442 else: 

443 return leaf_nodes_sorted.pop() 

444 

445 def use_longest_path() -> bool: 

446 size = 0 

447 # Heavy reducer / splitter topologies often benefit from a very 

448 # traditional critical path that expresses the longest chain of 

449 # tasks. 

450 if abs(len(root_nodes) - len(leaf_nodes)) / len(root_nodes) < 0.8: 

451 # If the graph stays about the same, we are checking for symmetry 

452 # and choose a "quickest path first" approach if the graph appears 

453 # to be asymmetrical 

454 for r in root_nodes: 

455 if not size: 

456 size = len(leafs_connected[r]) 

457 elif size != len(leafs_connected[r]): 

458 return False 

459 

460 return True 

461 

462 # Some topologies benefit if the node with the most dependencies 

463 # is used as first choice, others benefit from the opposite. 

464 longest_path = use_longest_path() 

465 leaf_nodes_sorted = sorted(leaf_nodes, key=sort_key, reverse=not longest_path) 

466 

467 # ************************************************************************* 

468 # CORE ALGORITHM STARTS HERE 

469 # 

470 # 0. Nomenclature 

471 # 

472 # - roots: Nodes that have no dependencies (i.e. typically data producers) 

473 # - leafs: Nodes that have no dependents (i.e. user requested keys) 

474 # - critical_path: The strategic path through the graph. 

475 # - walking forwards: Starting from a root node we walk the graph as if we 

476 # were to compute the individual nodes, i.e. along dependents 

477 # - walking backwards: Starting from a leaf node we walk the graph in 

478 # reverse direction, i.e. along dependencies 

479 # - runnable: Nodes that are ready to be computed because all their 

480 # dependencies are in result 

481 # - runnable_hull: Nodes that could be reached and executed without 

482 # "walking back". This typically means that these are tasks than can be 

483 # executed without loading additional data/executing additional root 

484 # nodes 

485 # - reachable_hull: Nodes that are touching the result, i.e. all nodes in 

486 # reachable_hull have at least one dependency in result 

487 # 

488 # A. Build the critical path 

489 # 

490 # To build the critical path we will use a provided `get_target` function 

491 # that returns a node that is anywhere in the graph, typically a leaf 

492 # node. This node is not required to be runnable. We will walk the graph 

493 # backwards, i.e. from leafs to roots and append nodes to the graph as we 

494 # go. The critical path is a 

495 # linear path in the graph. While this is a viable strategy, it is not 

496 # required for the critical path to be a classical "longest path" but it 

497 # can define any route through the graph that should be considered as top 

498 # priority. 

499 # 

500 # 1. Determine the target node by calling ``get_target`` and append the 

501 # target to the critical path stack 

502 # 2. Take the _most valuable_ (max given a `sort_key`) of its dependents 

503 # and append it to the critical path stack. This key is the new target. 

504 # 3. Repeat step 2 until we reach a node that has no dependencies and is 

505 # therefore runnable 

506 # 

507 # B. Walk the critical path 

508 # 

509 # Only the first element of the critical path is an actually runnable node 

510 # and this is where we're starting the sort. Strategically, this is the 

511 # most important goal to achieve but since not all of the nodes are 

512 # immediately runnable we have to walk back and compute other nodes first 

513 # before we can unlock the critical path. This typically requires us also 

514 # to load more data / run more root tasks. 

515 # While walking the critical path we will also unlock non-critical tasks 

516 # that could be run but are not contributing to our strategic goal. Under 

517 # certain circumstances, those runnable tasks are allowed to be run right 

518 # away to reduce memory pressure. This is described in more detail in 

519 # `process_runnable`. 

520 # Given this, the algorithm is as follows: 

521 # 

522 # 1. Pop the first element of the critical path 

523 # 2a. If the node is already in the result, continue 

524 # 2b. If the node is not runnable, we will put it back on the stack and 

525 # put all its dependencies on the stack and continue with step 1. This 

526 # is what we refer to as "walking back" 

527 # 2c. Else, we add the node to the result 

528 # 3. If we previously had to walk back we will consider running 

529 # non-critical tasks (by calling process_runnables) 

530 # 4a. If critical path is not empty, repeat step 1 

531 # 4b. Go back to A.) and build a new critical path given a new target that 

532 # accounts for the already computed nodes. 

533 # 

534 # ************************************************************************* 

535 

536 critical_path: list[Key] = [] 

537 cpath_append = critical_path.append 

538 scpath_add = scrit_path.add 

539 

540 def path_append(item: Key) -> None: 

541 cpath_append(item) 

542 scpath_add(item) 

543 

544 scpath_update = scrit_path.update 

545 cpath_extend = critical_path.extend 

546 

547 def path_extend(items: Iterable[Key]) -> None: 

548 cpath_extend(items) 

549 scpath_update(items) 

550 

551 cpath_pop = critical_path.pop 

552 scpath_discard = scrit_path.discard 

553 

554 def path_pop() -> Key: 

555 item = cpath_pop() 

556 scpath_discard(item) 

557 return item 

558 

559 while len(result) < expected_len: 

560 crit_path_counter += 1 

561 assert not critical_path 

562 assert not scrit_path 

563 

564 # A. Build the critical path 

565 target = get_target() 

566 next_deps = dependencies[target] 

567 path_append(target) 

568 

569 while next_deps: 

570 item = max(next_deps, key=sort_key) 

571 path_append(item) 

572 next_deps = dependencies[item].difference(result) 

573 path_extend(next_deps) 

574 

575 # B. Walk the critical path 

576 

577 walked_back = False 

578 while critical_path: 

579 item = path_pop() 

580 if item in result: 

581 continue 

582 if num_needed[item]: 

583 path_append(item) 

584 deps = dependencies[item].difference(result) 

585 unknown: list[Key] = [] 

586 known: list[Key] = [] 

587 k_append = known.append 

588 uk_append = unknown.append 

589 for d in sorted(deps, key=sort_key): 

590 if d in known_runnable_paths: 

591 k_append(d) 

592 else: 

593 uk_append(d) 

594 if len(unknown) > 1: 

595 walked_back = True 

596 

597 for d in unknown: 

598 path_append(d) 

599 for d in known: 

600 for path in known_runnable_paths_pop(d): 

601 path_extend(reversed(path)) 

602 

603 del deps 

604 continue 

605 else: 

606 if walked_back and len(runnable) < len(critical_path): 

607 process_runnables() 

608 add_to_result(item) 

609 process_runnables() 

610 

611 assert len(result) == expected_len 

612 for k in external_keys: 

613 del result[k] 

614 return result # type: ignore 

615 

616 

617def _connecting_to_roots( 

618 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]] 

619) -> tuple[dict[Key, frozenset[Key]], dict[Key, int]]: 

620 """Determine for every node which root nodes are connected to it (i.e. 

621 ancestors). If arguments of dependencies and dependents are switched, this 

622 can also be used to determine which leaf nodes are connected to which node 

623 (i.e. descendants). 

624 

625 Also computes a weight that is defined as (cheaper to compute here) 

626 

627 `max(len(dependents[k]) for k in connected_roots[key])` 

628 

629 """ 

630 result = {} 

631 current = [] 

632 num_needed = {k: len(v) for k, v in dependencies.items() if v} 

633 max_dependents = {} 

634 roots = set() 

635 for k, v in dependencies.items(): 

636 if not v: 

637 # Note: Hashing the full keys is relatively expensive. Hashing 

638 # integers would be much faster so this could be sped up by just 

639 # introducing a counter here. However, the order algorithm is also 

640 # sometimes interested in the actual keys and the only way to 

641 # benefit from the speedup of using integers would be to convert 

642 # this back on demand which makes the code very hard to read. 

643 roots.add(k) 

644 result[k] = frozenset({k}) 

645 deps = dependents[k] 

646 max_dependents[k] = len(deps) 

647 for child in deps: 

648 num_needed[child] -= 1 

649 if not num_needed[child]: 

650 current.append(child) 

651 

652 dedup_mapping: dict[frozenset[Key], frozenset[Key]] = {} 

653 while current: 

654 key = current.pop() 

655 if key in result: 

656 continue 

657 for parent in dependents[key]: 

658 num_needed[parent] -= 1 

659 if not num_needed[parent]: 

660 current.append(parent) 

661 # At some point, all the roots are the same, particularly for dense 

662 # graphs. We don't want to create new sets over and over again 

663 transitive_deps = [] 

664 transitive_deps_ids = set() 

665 max_dependents_key = list() 

666 for child in dependencies[key]: 

667 r_child = result[child] 

668 if id(r_child) in transitive_deps_ids: 

669 continue 

670 transitive_deps.append(r_child) 

671 transitive_deps_ids.add(id(r_child)) 

672 max_dependents_key.append(max_dependents[child]) 

673 

674 max_dependents[key] = max(max_dependents_key) 

675 if len(transitive_deps_ids) == 1: 

676 result[key] = transitive_deps[0] 

677 else: 

678 d = transitive_deps[0] 

679 if all(tdeps.issubset(d) for tdeps in transitive_deps[1:]): 

680 result[key] = d 

681 else: 

682 res = set(d) 

683 for tdeps in transitive_deps[1:]: 

684 res.update(tdeps) 

685 # frozenset is unfortunately triggering a copy. In the event of 

686 # a cache hit, this is wasted time but we can't hash the set 

687 # otherwise (unless we did it manually) and can therefore not 

688 # deduplicate without this copy 

689 frozen_res = frozenset(res) 

690 del res, tdeps 

691 try: 

692 result[key] = dedup_mapping[frozen_res] 

693 except KeyError: 

694 dedup_mapping[frozen_res] = frozen_res 

695 result[key] = frozen_res 

696 del dedup_mapping 

697 

698 empty_set: frozenset[Key] = frozenset() 

699 for r in roots: 

700 result[r] = empty_set 

701 return result, max_dependents 

702 

703 

704def ndependencies( 

705 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]] 

706) -> tuple[dict[Key, int], dict[Key, int]]: 

707 """Number of total data elements on which this key depends 

708 

709 For each key we return the number of tasks that must be run for us to run 

710 this task. 

711 

712 Examples 

713 -------- 

714 >>> inc = lambda x: x + 1 

715 >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')} 

716 >>> dependencies, dependents = get_deps(dsk) 

717 >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents) 

718 >>> sorted(total_dependencies.items()) 

719 [('a', 1), ('b', 2), ('c', 3)] 

720 

721 Returns 

722 ------- 

723 num_dependencies: Dict[key, int] 

724 total_dependencies: Dict[key, int] 

725 """ 

726 num_needed = {} 

727 result = {} 

728 for k, v in dependencies.items(): 

729 num_needed[k] = len(v) 

730 if not v: 

731 result[k] = 1 

732 

733 num_dependencies = num_needed.copy() 

734 current: list[Key] = [] 

735 current_pop = current.pop 

736 current_append = current.append 

737 

738 for key in result: 

739 for parent in dependents[key]: 

740 num_needed[parent] -= 1 

741 if not num_needed[parent]: 

742 current_append(parent) 

743 while current: 

744 key = current_pop() 

745 result[key] = 1 + sum(result[child] for child in dependencies[key]) 

746 for parent in dependents[key]: 

747 num_needed[parent] -= 1 

748 if not num_needed[parent]: 

749 current_append(parent) 

750 return num_dependencies, result 

751 

752 

753OrderInfo = namedtuple( 

754 "OrderInfo", 

755 ( 

756 "order", 

757 "age", 

758 "num_data_when_run", 

759 "num_data_when_released", 

760 "num_dependencies_freed", 

761 ), 

762) 

763 

764 

765def diagnostics( 

766 dsk: MutableMapping[Key, Any], 

767 o: Mapping[Key, int] | None = None, 

768 dependencies: MutableMapping[Key, set[Key]] | None = None, 

769) -> tuple[dict[Key, OrderInfo], list[int]]: 

770 """Simulate runtime metrics as though running tasks one at a time in order. 

771 

772 These diagnostics can help reveal behaviors of and issues with ``order``. 

773 

774 Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time. 

775 

776 OrderInfo fields: 

777 - order : the order in which the node is run. 

778 - age : how long the output of a node is held. 

779 - num_data_when_run : the number of outputs held in memory when a node is run. 

780 - num_data_when_released : the number of outputs held in memory when the output is released. 

781 - num_dependencies_freed : the number of dependencies freed by running the node. 

782 """ 

783 if dependencies is None: 

784 dependencies, dependents = get_deps(dsk) 

785 else: 

786 dependents = reverse_dict(dependencies) 

787 assert dependencies is not None 

788 if o is None: 

789 o = order(dsk, dependencies=dependencies, return_stats=False) 

790 

791 pressure = [] 

792 num_in_memory = 0 

793 age = {} 

794 runpressure = {} 

795 releasepressure = {} 

796 freed = {} 

797 num_needed = {key: len(val) for key, val in dependents.items()} 

798 for i, key in enumerate(sorted(dsk, key=o.__getitem__)): 

799 pressure.append(num_in_memory) 

800 runpressure[key] = num_in_memory 

801 released = 0 

802 for dep in dependencies[key]: 

803 num_needed[dep] -= 1 

804 if num_needed[dep] == 0: 

805 age[dep] = i - o[dep] 

806 releasepressure[dep] = num_in_memory 

807 released += 1 

808 freed[key] = released 

809 if dependents[key]: 

810 num_in_memory -= released - 1 

811 else: 

812 age[key] = 0 

813 releasepressure[key] = num_in_memory 

814 num_in_memory -= released 

815 

816 rv = { 

817 key: OrderInfo( 

818 val, age[key], runpressure[key], releasepressure[key], freed[key] 

819 ) 

820 for key, val in o.items() 

821 } 

822 return rv, pressure 

823 

824 

825def _f() -> None: ... 

826 

827 

828def sanitize_dsk(dsk: MutableMapping[Key, Any]) -> dict: 

829 """Take a dask graph and replace callables with a dummy function and remove 

830 payload data like numpy arrays, dataframes, etc. 

831 """ 

832 from dask._task_spec import Task, TaskRef 

833 

834 new = {} 

835 deps = DependenciesMapping(dsk) 

836 for key, values in deps.items(): 

837 new[key] = Task(key, _f, *(TaskRef(k) for k in values)) 

838 return new