Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/order.py: 5%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

415 statements  

1from __future__ import annotations 

2 

3r""" Static order of nodes in dask graph 

4 

5Dask makes decisions on what tasks to prioritize both 

6 

7* Dynamically at runtime 

8* Statically before runtime 

9 

10Dynamically we prefer to run tasks that were just made available. However when 

11several tasks become available at the same time we have an opportunity to break 

12ties in an intelligent way 

13 

14 d 

15 | 

16 b c 

17 \ / 

18 a 

19 

20For example after we finish ``a`` we can choose to run either ``b`` or ``c`` 

21next. Making small decisions like this can greatly affect our performance, 

22especially because the order in which we run tasks affects the order in which 

23we can release memory, which operationally we find to have a large affect on 

24many computation. We want to run tasks in such a way that we keep only a small 

25amount of data in memory at any given time. 

26 

27 

28Static Ordering 

29--------------- 

30 

31And so we create a total ordering over all nodes to serve as a tie breaker. We 

32represent this ordering with a dictionary mapping keys to integer values. 

33Lower scores have higher priority. These scores correspond to the order in 

34which a sequential scheduler would visit each node. 

35 

36 {'a': 0, 

37 'c': 1, 

38 'd': 2, 

39 'b': 3} 

40 

41There are several ways in which we might order our keys. This is a nuanced 

42process that has to take into account many different kinds of workflows, and 

43operate efficiently in linear time. We strongly recommend that readers look at 

44the docstrings of tests in dask/tests/test_order.py. These tests usually have 

45graph types laid out very carefully to show the kinds of situations that often 

46arise, and the order we would like to be determined. 

47 

48""" 

49from collections import defaultdict, deque, namedtuple 

50from collections.abc import Callable, Iterable, Mapping, MutableMapping 

51from typing import Any, Literal, NamedTuple, overload 

52 

53from dask._task_spec import DataNode, DependenciesMapping 

54from dask.core import get_deps, getcycle, istask, reverse_dict 

55from dask.typing import Key 

56 

57 

58class Order(NamedTuple): 

59 priority: int 

60 critical_path: float | int 

61 

62 

63@overload 

64def order( 

65 dsk: Mapping[Key, Any], 

66 dependencies: Mapping[Key, set[Key]] | None = None, 

67 *, 

68 return_stats: Literal[True], 

69) -> dict[Key, Order]: ... 

70 

71 

72@overload 

73def order( 

74 dsk: Mapping[Key, Any], 

75 dependencies: Mapping[Key, set[Key]] | None = None, 

76 *, 

77 return_stats: Literal[False] = False, 

78) -> dict[Key, int]: ... 

79 

80 

81def order( 

82 dsk: Mapping[Key, Any], 

83 dependencies: Mapping[Key, set[Key]] | None = None, 

84 *, 

85 return_stats: bool = False, 

86) -> dict[Key, Order] | dict[Key, int]: 

87 """Order nodes in dask graph 

88 

89 This produces an ordering over our tasks that we use to break ties when 

90 executing. We do this ahead of time to reduce a bit of stress on the 

91 scheduler and also to assist in static analysis. 

92 

93 This currently traverses the graph as a single-threaded scheduler would 

94 traverse it. 

95 

96 Examples 

97 -------- 

98 >>> inc = lambda x: x + 1 

99 >>> add = lambda x, y: x + y 

100 >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')} 

101 >>> order(dsk) 

102 {'a': 0, 'c': 1, 'b': 2, 'd': 3} 

103 """ 

104 if not dsk: 

105 return {} 

106 

107 dsk = dict(dsk) 

108 

109 dependencies = DependenciesMapping(dsk) 

110 dependents = reverse_dict(dependencies) 

111 

112 external_keys = set() 

113 if len(dependents) != len(dependencies): 

114 # There are external keys. Let's add a DataNode to ensure the algo below 

115 # is encountering a complete graph. Those artificial data nodes will be 

116 # ignored when assigning results 

117 for k in dependents: 

118 if k not in dependencies: 

119 external_keys.add(k) 

120 dsk[k] = DataNode(k, object()) 

121 

122 expected_len = len(dsk) 

123 leaf_nodes = {k for k, v in dependents.items() if not v} 

124 root_nodes = {k for k, v in dependencies.items() if not v} 

125 

126 result: dict[Key, Order | int] = {} 

127 

128 # Normalize the graph by removing leaf nodes that are not actual tasks, see 

129 # for instance da.store where the task is merely an alias 

130 # to multiple keys, i.e. [key1, key2, ...,] 

131 

132 # Similarly, we are removing root nodes that are pure data tasks. Those task 

133 # are embedded in the run_spec of a task and are not runnable. We have to 

134 # assign a priority but their priority has no impact on performance. 

135 # The removal of those tasks typically transforms the graph topology in a 

136 # way that is simpler to handle 

137 all_tasks = False 

138 n_removed_leaves = 0 

139 requires_data_task = defaultdict(set) 

140 

141 while not all_tasks: 

142 all_tasks = True 

143 for leaf in list(leaf_nodes): 

144 if leaf in root_nodes: 

145 continue 

146 if ( 

147 not istask(dsk[leaf]) 

148 # Having a linear chain is fine 

149 and len(dependencies[leaf]) > 1 

150 ): 

151 all_tasks = False 

152 # Put non-tasks at the very end since they are merely aliases 

153 # and have no impact on performance at all 

154 prio = len(dsk) - 1 - n_removed_leaves 

155 if return_stats: 

156 result[leaf] = Order(prio, -1) 

157 else: 

158 result[leaf] = prio 

159 n_removed_leaves += 1 

160 leaf_nodes.remove(leaf) 

161 for dep in dependencies[leaf]: 

162 dependents[dep].remove(leaf) 

163 if not dependents[dep]: 

164 leaf_nodes.add(dep) 

165 del dsk[leaf] 

166 del dependencies[leaf] 

167 del dependents[leaf] 

168 

169 for root in list(root_nodes): 

170 if root in leaf_nodes: 

171 continue 

172 deps_root = dependents[root] 

173 if not istask(dsk[root]) and len(deps_root) > 1: 

174 del dsk[root] 

175 del dependencies[root] 

176 root_nodes.remove(root) 

177 del dependents[root] 

178 for dep in deps_root: 

179 requires_data_task[dep].add(root) 

180 if not dependencies[dep]: 

181 root_nodes.add(dep) 

182 

183 num_needed, total_dependencies = ndependencies(dependencies, dependents) 

184 if len(total_dependencies) != len(dsk): 

185 cycle = getcycle(dsk, None) 

186 raise RuntimeError( 

187 "Cycle detected between the following keys:\n -> {}".format( 

188 "\n -> ".join(str(x) for x in cycle) 

189 ) 

190 ) 

191 assert dependencies is not None 

192 roots_connected, max_dependents = _connecting_to_roots(dependencies, dependents) 

193 leafs_connected, _ = _connecting_to_roots(dependents, dependencies) 

194 i = 0 

195 

196 runnable_hull = set() 

197 reachable_hull = set() 

198 

199 runnable: list[Key] = [] 

200 

201 known_runnable_paths: dict[Key, list[list[Key]]] = {} 

202 known_runnable_paths_pop = known_runnable_paths.pop 

203 

204 crit_path_counter = 0 

205 scrit_path: set[Key] = set() 

206 _crit_path_counter_offset: int | float = 0 

207 _sort_keys_cache: dict[Key, tuple[int, int, int, int, str]] = {} 

208 

209 def sort_key(x: Key) -> tuple[int, int, int, int, str]: 

210 try: 

211 return _sort_keys_cache[x] 

212 except KeyError: 

213 assert dependencies is not None 

214 _sort_keys_cache[x] = rv = ( 

215 total_dependencies[x], 

216 len(dependencies[x]), 

217 len(roots_connected[x]), 

218 -max_dependents[x], 

219 # Converting to str is actually faster than creating some 

220 # wrapper class and comparisons that come this far are 

221 # relatively rare so we prefer fast init over fast comparison 

222 str(x), 

223 ) 

224 return rv 

225 

226 def add_to_result(item: Key) -> None: 

227 # Earlier versions recursed into this method but this could cause 

228 # recursion depth errors. This is the only reason for the while loop 

229 next_items = [item] 

230 nonlocal i 

231 nonlocal min_leaf_degree # type: ignore[misc] 

232 while next_items: 

233 item = next_items.pop() 

234 runnable_hull.discard(item) 

235 reachable_hull.discard(item) 

236 leaf_nodes.discard(item) 

237 if item in result: 

238 continue 

239 

240 while requires_data_task[item]: 

241 add_to_result(requires_data_task[item].pop()) 

242 if return_stats: 

243 result[item] = Order(i, crit_path_counter - _crit_path_counter_offset) 

244 else: 

245 result[item] = i 

246 if item not in external_keys: 

247 i += 1 

248 if item in root_nodes: 

249 for leaf in leafs_connected[item]: 

250 if leaf in leaf_nodes: 

251 degree = leafs_degree.pop(leaf) 

252 leafs_per_degree[degree].remove(leaf) 

253 

254 new_degree = degree - 1 

255 if new_degree > 0: 

256 if new_degree < min_leaf_degree: 

257 min_leaf_degree = new_degree 

258 leafs_per_degree[new_degree].add(leaf) 

259 leafs_degree[leaf] = new_degree 

260 elif not leafs_per_degree[degree]: 

261 assert degree == min_leaf_degree 

262 while min_leaf_degree != max_leaf_degree and ( 

263 min_leaf_degree not in leafs_per_degree 

264 or not leafs_per_degree[min_leaf_degree] 

265 ): 

266 min_leaf_degree += 1 

267 

268 # Note: This is a `set` and therefore this introduces a certain 

269 # randomness. However, this randomness should not have any impact on 

270 # the final result since the `process_runnable` should produce 

271 # equivalent results regardless of the order in which runnable is 

272 # populated (not identical but equivalent) 

273 for dep in dependents.get(item, ()): 

274 num_needed[dep] -= 1 

275 reachable_hull.add(dep) 

276 if not num_needed[dep]: 

277 if len(dependents[item]) == 1: 

278 next_items.append(dep) 

279 else: 

280 runnable.append(dep) 

281 

282 def _with_offset(func: Callable[..., None]) -> Callable[..., None]: 

283 # This decorator is only used to reduce indentation levels. The offset 

284 # is purely cosmetical and used for some visualizations and I haven't 

285 # settled on how to implement this best so I didn't want to have large 

286 # indentations that make things harder to read 

287 

288 def wrapper(*args: Any, **kwargs: Any) -> None: 

289 nonlocal _crit_path_counter_offset 

290 _crit_path_counter_offset = 0.5 

291 try: 

292 func(*args, **kwargs) 

293 finally: 

294 _crit_path_counter_offset = 0 

295 

296 return wrapper 

297 

298 @_with_offset 

299 def process_runnables() -> None: 

300 """Compute all currently runnable paths and either cache or execute them 

301 

302 This is designed to ensure we are running tasks that are free to execute 

303 (e.g. the result of a splitter task) not too eagerly. If we executed 

304 such free tasks too early we'd be walking the graph in a too wide / 

305 breadth first fashion that is not optimal. If instead we were to only 

306 execute them once they are needed for a final result, this can cause 

307 very high memory pressure since valuable reducers are executed too 

308 late. 

309 

310 The strategy here is to take all runnable tasks and walk forwards until 

311 we hit a reducer node (i.e. a node with more than one dependency). We 

312 will remember/cache the path to this reducer node. 

313 If this path leads to a leaf or if we find enough runnable paths for a 

314 reducer to be runnable, we will execute the path. 

315 

316 If instead of a reducer a splitter is encountered that is runnable, we 

317 will follow its splitter paths individually and apply the same logic to 

318 each branch. 

319 """ 

320 while runnable: 

321 candidates = runnable.copy() 

322 runnable.clear() 

323 while candidates: 

324 key = candidates.pop() 

325 if key in runnable_hull or key in result: 

326 continue 

327 if key in leaf_nodes: 

328 add_to_result(key) 

329 continue 

330 path = [key] 

331 branches = deque([(0, path)]) 

332 

333 while branches: 

334 nsplits, path = branches.popleft() 

335 while True: 

336 # Loop invariant. Too expensive to compute at runtime 

337 # assert not set(known_runnable_paths).intersection(runnable_hull) 

338 current = path[-1] 

339 runnable_hull.add(current) 

340 deps_downstream = dependents[current] 

341 deps_upstream = dependencies[current] 

342 if not deps_downstream: 

343 # FIXME: The fact that it is possible for 

344 # num_needed[current] == 0 means we're doing some 

345 # work twice 

346 if num_needed[current] <= 1: 

347 for k in path: 

348 add_to_result(k) 

349 else: 

350 runnable_hull.discard(current) 

351 elif len(path) == 1 or len(deps_upstream) == 1: 

352 if len(deps_downstream) > 1: 

353 nsplits += 1 

354 for d in sorted(deps_downstream, key=sort_key): 

355 # This ensures we're only considering splitters 

356 # that are genuinely splitting and not 

357 # interleaving 

358 if len(dependencies[d]) == 1: 

359 branch = path.copy() 

360 branch.append(d) 

361 branches.append((nsplits, branch)) 

362 break 

363 path.extend(deps_downstream) 

364 continue 

365 elif current in known_runnable_paths: 

366 known_runnable_paths[current].append(path) 

367 runnable_hull.discard(current) 

368 if ( 

369 len(known_runnable_paths[current]) 

370 >= num_needed[current] 

371 ): 

372 pruned_branches: deque[list[Key]] = deque() 

373 for path in known_runnable_paths_pop(current): 

374 if path[-2] not in result: 

375 pruned_branches.append(path) 

376 if len(pruned_branches) < num_needed[current]: 

377 known_runnable_paths[current] = list( 

378 pruned_branches 

379 ) 

380 else: 

381 if nsplits > 1: 

382 path = [] 

383 for pruned in pruned_branches: 

384 path.extend(pruned) 

385 branches.append((nsplits - 1, path)) 

386 break 

387 

388 while pruned_branches: 

389 path = pruned_branches.popleft() 

390 for k in path: 

391 if num_needed[k]: 

392 pruned_branches.append(path) 

393 break 

394 add_to_result(k) 

395 elif ( 

396 len(dependencies[current]) > 1 and num_needed[current] <= 1 

397 ): 

398 for k in path: 

399 add_to_result(k) 

400 else: 

401 known_runnable_paths[current] = [path] 

402 runnable_hull.discard(current) 

403 break 

404 

405 # Pick strategy 

406 # Note: We're trying to be smart here by picking a strategy on how to 

407 # determine the critical path. This is not always clear and we may want to 

408 # consider just calculating both orderings and picking the one with less 

409 # pressure. The only concern to this would be performance but at time of 

410 # writing, the most expensive part of ordering is the prep work (mostly 

411 # connected roots + sort_key) which can be reused for multiple orderings. 

412 

413 # Degree in this context is the number root nodes that have to be loaded for 

414 # this leaf to become accessible. Zero means the leaf is already accessible 

415 # in which case it _should_ either already be in result or be accessible via 

416 # process_runnables 

417 # When picking a new target, we prefer the leafs with the least number of 

418 # roots that need loading. 

419 leafs_degree = {} 

420 leafs_per_degree = defaultdict(set) 

421 min_leaf_degree = len(dsk) 

422 max_leaf_degree = len(dsk) 

423 for leaf in leaf_nodes - root_nodes: 

424 degree = len(roots_connected[leaf]) 

425 min_leaf_degree = min(min_leaf_degree, degree) 

426 max_leaf_degree = max(max_leaf_degree, degree) 

427 leafs_degree[leaf] = degree 

428 leafs_per_degree[degree].add(leaf) 

429 

430 def get_target() -> Key: 

431 # If we're already mid run and there is a runnable_hull we'll attempt to 

432 # pick the next target in a way that minimizes the number of additional 

433 # root nodes that are needed 

434 all_leafs_accessible = min_leaf_degree == max_leaf_degree 

435 is_trivial_lookup = not reachable_hull or all_leafs_accessible 

436 if not is_trivial_lookup: 

437 candidates = reachable_hull & leafs_per_degree[min_leaf_degree] 

438 if not candidates: 

439 candidates = leafs_per_degree[min_leaf_degree] 

440 # Even without reachable hull overlap this should be relatively 

441 # small so one full pass should be fine 

442 return min(candidates, key=sort_key) 

443 else: 

444 return leaf_nodes_sorted.pop() 

445 

446 def use_longest_path() -> bool: 

447 size = 0 

448 # Heavy reducer / splitter topologies often benefit from a very 

449 # traditional critical path that expresses the longest chain of 

450 # tasks. 

451 if abs(len(root_nodes) - len(leaf_nodes)) / len(root_nodes) < 0.8: 

452 # If the graph stays about the same, we are checking for symmetry 

453 # and choose a "quickest path first" approach if the graph appears 

454 # to be asymmetrical 

455 for r in root_nodes: 

456 if not size: 

457 size = len(leafs_connected[r]) 

458 elif size != len(leafs_connected[r]): 

459 return False 

460 

461 return True 

462 

463 # Some topologies benefit if the node with the most dependencies 

464 # is used as first choice, others benefit from the opposite. 

465 longest_path = use_longest_path() 

466 leaf_nodes_sorted = sorted(leaf_nodes, key=sort_key, reverse=not longest_path) 

467 

468 # ************************************************************************* 

469 # CORE ALGORITHM STARTS HERE 

470 # 

471 # 0. Nomenclature 

472 # 

473 # - roots: Nodes that have no dependencies (i.e. typically data producers) 

474 # - leafs: Nodes that have no dependents (i.e. user requested keys) 

475 # - critical_path: The strategic path through the graph. 

476 # - walking forwards: Starting from a root node we walk the graph as if we 

477 # were to compute the individual nodes, i.e. along dependents 

478 # - walking backwards: Starting from a leaf node we walk the graph in 

479 # reverse direction, i.e. along dependencies 

480 # - runnable: Nodes that are ready to be computed because all their 

481 # dependencies are in result 

482 # - runnable_hull: Nodes that could be reached and executed without 

483 # "walking back". This typically means that these are tasks than can be 

484 # executed without loading additional data/executing additional root 

485 # nodes 

486 # - reachable_hull: Nodes that are touching the result, i.e. all nodes in 

487 # reachable_hull have at least one dependency in result 

488 # 

489 # A. Build the critical path 

490 # 

491 # To build the critical path we will use a provided `get_target` function 

492 # that returns a node that is anywhere in the graph, typically a leaf 

493 # node. This node is not required to be runnable. We will walk the graph 

494 # backwards, i.e. from leafs to roots and append nodes to the graph as we 

495 # go. The critical path is a 

496 # linear path in the graph. While this is a viable strategy, it is not 

497 # required for the critical path to be a classical "longest path" but it 

498 # can define any route through the graph that should be considered as top 

499 # priority. 

500 # 

501 # 1. Determine the target node by calling ``get_target`` and append the 

502 # target to the critical path stack 

503 # 2. Take the _most valuable_ (max given a `sort_key`) of its dependents 

504 # and append it to the critical path stack. This key is the new target. 

505 # 3. Repeat step 2 until we reach a node that has no dependencies and is 

506 # therefore runnable 

507 # 

508 # B. Walk the critical path 

509 # 

510 # Only the first element of the critical path is an actually runnable node 

511 # and this is where we're starting the sort. Strategically, this is the 

512 # most important goal to achieve but since not all of the nodes are 

513 # immediately runnable we have to walk back and compute other nodes first 

514 # before we can unlock the critical path. This typically requires us also 

515 # to load more data / run more root tasks. 

516 # While walking the critical path we will also unlock non-critical tasks 

517 # that could be run but are not contributing to our strategic goal. Under 

518 # certain circumstances, those runnable tasks are allowed to be run right 

519 # away to reduce memory pressure. This is described in more detail in 

520 # `process_runnable`. 

521 # Given this, the algorithm is as follows: 

522 # 

523 # 1. Pop the first element of the critical path 

524 # 2a. If the node is already in the result, continue 

525 # 2b. If the node is not runnable, we will put it back on the stack and 

526 # put all its dependencies on the stack and continue with step 1. This 

527 # is what we refer to as "walking back" 

528 # 2c. Else, we add the node to the result 

529 # 3. If we previously had to walk back we will consider running 

530 # non-critical tasks (by calling process_runnables) 

531 # 4a. If critical path is not empty, repeat step 1 

532 # 4b. Go back to A.) and build a new critical path given a new target that 

533 # accounts for the already computed nodes. 

534 # 

535 # ************************************************************************* 

536 

537 critical_path: list[Key] = [] 

538 cpath_append = critical_path.append 

539 scpath_add = scrit_path.add 

540 

541 def path_append(item: Key) -> None: 

542 cpath_append(item) 

543 scpath_add(item) 

544 

545 scpath_update = scrit_path.update 

546 cpath_extend = critical_path.extend 

547 

548 def path_extend(items: Iterable[Key]) -> None: 

549 cpath_extend(items) 

550 scpath_update(items) 

551 

552 cpath_pop = critical_path.pop 

553 scpath_discard = scrit_path.discard 

554 

555 def path_pop() -> Key: 

556 item = cpath_pop() 

557 scpath_discard(item) 

558 return item 

559 

560 while len(result) < expected_len: 

561 crit_path_counter += 1 

562 assert not critical_path 

563 assert not scrit_path 

564 

565 # A. Build the critical path 

566 target = get_target() 

567 next_deps = dependencies[target] 

568 path_append(target) 

569 

570 while next_deps: 

571 item = max(next_deps, key=sort_key) 

572 path_append(item) 

573 next_deps = dependencies[item].difference(result) 

574 path_extend(next_deps) 

575 

576 # B. Walk the critical path 

577 

578 walked_back = False 

579 while critical_path: 

580 item = path_pop() 

581 if item in result: 

582 continue 

583 if num_needed[item]: 

584 path_append(item) 

585 deps = dependencies[item].difference(result) 

586 unknown: list[Key] = [] 

587 known: list[Key] = [] 

588 k_append = known.append 

589 uk_append = unknown.append 

590 for d in sorted(deps, key=sort_key): 

591 if d in known_runnable_paths: 

592 k_append(d) 

593 else: 

594 uk_append(d) 

595 if len(unknown) > 1: 

596 walked_back = True 

597 

598 for d in unknown: 

599 path_append(d) 

600 for d in known: 

601 for path in known_runnable_paths_pop(d): 

602 path_extend(reversed(path)) 

603 

604 del deps 

605 continue 

606 else: 

607 if walked_back and len(runnable) < len(critical_path): 

608 process_runnables() 

609 add_to_result(item) 

610 process_runnables() 

611 

612 assert len(result) == expected_len 

613 for k in external_keys: 

614 del result[k] 

615 return result # type: ignore 

616 

617 

618def _connecting_to_roots( 

619 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]] 

620) -> tuple[dict[Key, frozenset[Key]], dict[Key, int]]: 

621 """Determine for every node which root nodes are connected to it (i.e. 

622 ancestors). If arguments of dependencies and dependents are switched, this 

623 can also be used to determine which leaf nodes are connected to which node 

624 (i.e. descendants). 

625 

626 Also computes a weight that is defined as (cheaper to compute here) 

627 

628 `max(len(dependents[k]) for k in connected_roots[key])` 

629 

630 """ 

631 result = {} 

632 current = [] 

633 num_needed = {k: len(v) for k, v in dependencies.items() if v} 

634 max_dependents = {} 

635 roots = set() 

636 for k, v in dependencies.items(): 

637 if not v: 

638 # Note: Hashing the full keys is relatively expensive. Hashing 

639 # integers would be much faster so this could be sped up by just 

640 # introducing a counter here. However, the order algorithm is also 

641 # sometimes interested in the actual keys and the only way to 

642 # benefit from the speedup of using integers would be to convert 

643 # this back on demand which makes the code very hard to read. 

644 roots.add(k) 

645 result[k] = frozenset({k}) 

646 deps = dependents[k] 

647 max_dependents[k] = len(deps) 

648 for child in deps: 

649 num_needed[child] -= 1 

650 if not num_needed[child]: 

651 current.append(child) 

652 

653 dedup_mapping: dict[frozenset[Key], frozenset[Key]] = {} 

654 while current: 

655 key = current.pop() 

656 if key in result: 

657 continue 

658 for parent in dependents[key]: 

659 num_needed[parent] -= 1 

660 if not num_needed[parent]: 

661 current.append(parent) 

662 # At some point, all the roots are the same, particularly for dense 

663 # graphs. We don't want to create new sets over and over again 

664 transitive_deps = [] 

665 transitive_deps_ids = set() 

666 max_dependents_key = list() 

667 for child in dependencies[key]: 

668 r_child = result[child] 

669 if id(r_child) in transitive_deps_ids: 

670 continue 

671 transitive_deps.append(r_child) 

672 transitive_deps_ids.add(id(r_child)) 

673 max_dependents_key.append(max_dependents[child]) 

674 

675 max_dependents[key] = max(max_dependents_key) 

676 if len(transitive_deps_ids) == 1: 

677 result[key] = transitive_deps[0] 

678 else: 

679 d = transitive_deps[0] 

680 if all(tdeps.issubset(d) for tdeps in transitive_deps[1:]): 

681 result[key] = d 

682 else: 

683 res = set(d) 

684 for tdeps in transitive_deps[1:]: 

685 res.update(tdeps) 

686 # frozenset is unfortunately triggering a copy. In the event of 

687 # a cache hit, this is wasted time but we can't hash the set 

688 # otherwise (unless we did it manually) and can therefore not 

689 # deduplicate without this copy 

690 frozen_res = frozenset(res) 

691 del res, tdeps 

692 try: 

693 result[key] = dedup_mapping[frozen_res] 

694 except KeyError: 

695 dedup_mapping[frozen_res] = frozen_res 

696 result[key] = frozen_res 

697 del dedup_mapping 

698 

699 empty_set: frozenset[Key] = frozenset() 

700 for r in roots: 

701 result[r] = empty_set 

702 return result, max_dependents 

703 

704 

705def ndependencies( 

706 dependencies: Mapping[Key, set[Key]], dependents: Mapping[Key, set[Key]] 

707) -> tuple[dict[Key, int], dict[Key, int]]: 

708 """Number of total data elements on which this key depends 

709 

710 For each key we return the number of tasks that must be run for us to run 

711 this task. 

712 

713 Examples 

714 -------- 

715 >>> inc = lambda x: x + 1 

716 >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')} 

717 >>> dependencies, dependents = get_deps(dsk) 

718 >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents) 

719 >>> sorted(total_dependencies.items()) 

720 [('a', 1), ('b', 2), ('c', 3)] 

721 

722 Returns 

723 ------- 

724 num_dependencies: Dict[key, int] 

725 total_dependencies: Dict[key, int] 

726 """ 

727 num_needed = {} 

728 result = {} 

729 for k, v in dependencies.items(): 

730 num_needed[k] = len(v) 

731 if not v: 

732 result[k] = 1 

733 

734 num_dependencies = num_needed.copy() 

735 current: list[Key] = [] 

736 current_pop = current.pop 

737 current_append = current.append 

738 

739 for key in result: 

740 for parent in dependents[key]: 

741 num_needed[parent] -= 1 

742 if not num_needed[parent]: 

743 current_append(parent) 

744 while current: 

745 key = current_pop() 

746 result[key] = 1 + sum(result[child] for child in dependencies[key]) 

747 for parent in dependents[key]: 

748 num_needed[parent] -= 1 

749 if not num_needed[parent]: 

750 current_append(parent) 

751 return num_dependencies, result 

752 

753 

754OrderInfo = namedtuple( 

755 "OrderInfo", 

756 ( 

757 "order", 

758 "age", 

759 "num_data_when_run", 

760 "num_data_when_released", 

761 "num_dependencies_freed", 

762 ), 

763) 

764 

765 

766def diagnostics( 

767 dsk: MutableMapping[Key, Any], 

768 o: Mapping[Key, int] | None = None, 

769 dependencies: MutableMapping[Key, set[Key]] | None = None, 

770) -> tuple[dict[Key, OrderInfo], list[int]]: 

771 """Simulate runtime metrics as though running tasks one at a time in order. 

772 

773 These diagnostics can help reveal behaviors of and issues with ``order``. 

774 

775 Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time. 

776 

777 OrderInfo fields: 

778 - order : the order in which the node is run. 

779 - age : how long the output of a node is held. 

780 - num_data_when_run : the number of outputs held in memory when a node is run. 

781 - num_data_when_released : the number of outputs held in memory when the output is released. 

782 - num_dependencies_freed : the number of dependencies freed by running the node. 

783 """ 

784 if dependencies is None: 

785 dependencies, dependents = get_deps(dsk) 

786 else: 

787 dependents = reverse_dict(dependencies) 

788 assert dependencies is not None 

789 if o is None: 

790 o = order(dsk, dependencies=dependencies, return_stats=False) 

791 

792 pressure = [] 

793 num_in_memory = 0 

794 age = {} 

795 runpressure = {} 

796 releasepressure = {} 

797 freed = {} 

798 num_needed = {key: len(val) for key, val in dependents.items()} 

799 for i, key in enumerate(sorted(dsk, key=o.__getitem__)): 

800 pressure.append(num_in_memory) 

801 runpressure[key] = num_in_memory 

802 released = 0 

803 for dep in dependencies[key]: 

804 num_needed[dep] -= 1 

805 if num_needed[dep] == 0: 

806 age[dep] = i - o[dep] 

807 releasepressure[dep] = num_in_memory 

808 released += 1 

809 freed[key] = released 

810 if dependents[key]: 

811 num_in_memory -= released - 1 

812 else: 

813 age[key] = 0 

814 releasepressure[key] = num_in_memory 

815 num_in_memory -= released 

816 

817 rv = { 

818 key: OrderInfo( 

819 val, age[key], runpressure[key], releasepressure[key], freed[key] 

820 ) 

821 for key, val in o.items() 

822 } 

823 return rv, pressure 

824 

825 

826def _f() -> None: ... 

827 

828 

829def sanitize_dsk(dsk: MutableMapping[Key, Any]) -> dict: 

830 """Take a dask graph and replace callables with a dummy function and remove 

831 payload data like numpy arrays, dataframes, etc. 

832 """ 

833 from dask._task_spec import Task, TaskRef 

834 

835 new = {} 

836 deps = DependenciesMapping(dsk) 

837 for key, values in deps.items(): 

838 new[key] = Task(key, _f, *(TaskRef(k) for k in values)) 

839 return new