Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/local.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1""" 

2Asynchronous Shared-Memory Scheduler for Dask Graphs. 

3 

4This scheduler coordinates several workers to execute tasks in a dask graph in 

5parallel. It depends on a ``concurrent.futures.Executor`` 

6and a corresponding Queue for worker-to-scheduler communication. 

7 

8It tries to execute tasks in an order which maintains a small memory footprint 

9throughout execution. It does this by running tasks that allow us to release 

10data resources. 

11 

12 

13Task Selection Policy 

14===================== 

15 

16When we complete a task we add more data in to our set of available data; this 

17new data makes new tasks available. We preferentially choose tasks that were 

18just made available in a last-in-first-out fashion. We implement this as a 

19simple stack. This results in more depth-first rather than breadth first 

20behavior which encourages us to process batches of data to completion before 

21starting in on new data when possible. 

22 

23When the addition of new data readies multiple tasks simultaneously we add 

24tasks to the stack in sorted order so that tasks with greater keynames are run 

25first. This can be handy to break ties in a predictable fashion. 

26 

27 

28State 

29===== 

30 

31Many functions pass around a ``state`` variable that holds the current state of 

32the computation. This variable consists of several other dictionaries and 

33sets, explained below. 

34 

35Constant state 

36-------------- 

37 

381. dependencies: {x: [a, b ,c]} a,b,c, must be run before x 

392. dependents: {a: [x, y]} a must run before x or y 

40 

41Changing state 

42-------------- 

43 

44### Data 

45 

461. cache: available concrete data. {key: actual-data} 

472. released: data that we've seen, used, and released because it is no longer 

48 needed 

49 

50### Jobs 

51 

521. ready: A fifo stack of ready-to-run tasks 

532. running: A set of tasks currently in execution 

543. finished: A set of finished tasks 

554. waiting: which tasks are still waiting on others :: {key: {keys}} 

56 Real-time equivalent of dependencies 

575. waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}} 

58 Real-time equivalent of dependents 

59 

60 

61Examples 

62-------- 

63 

64>>> import pprint # doctest: +SKIP 

65>>> inc = lambda x: x + 1 

66>>> add = lambda x, y: x + y 

67>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP 

68>>> pprint.pprint(start_state_from_dask(dsk)) # doctest: +SKIP 

69{'cache': {'x': 1, 'y': 2}, 

70 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}}, 

71 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}), 

72 'finished': set(), 

73 'ready': ['z'], 

74 'released': set(), 

75 'running': set(), 

76 'waiting': {'w': {'z'}}, 

77 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}} 

78 

79Optimizations 

80============= 

81 

82We build this scheduler with out-of-core array operations in mind. To this end 

83we have encoded some particular optimizations. 

84 

85Compute to release data 

86----------------------- 

87 

88When we choose a new task to execute we often have many options. Policies at 

89this stage are cheap and can significantly impact performance. One could 

90imagine policies that expose parallelism, drive towards a particular output, 

91etc.. 

92 

93Our current policy is to run tasks that were most recently made available. 

94 

95 

96Inlining computations 

97--------------------- 

98 

99We hold on to intermediate computations either in memory or on disk. 

100 

101For very cheap computations that may emit new copies of the data, like 

102``np.transpose`` or possibly even ``x + 1`` we choose not to store these as 

103separate pieces of data / tasks. Instead we combine them with the computations 

104that require them. This may result in repeated computation but saves 

105significantly on space and computation complexity. 

106 

107See the function ``inline_functions`` for more information. 

108""" 

109 

110from __future__ import annotations 

111 

112import os 

113from collections import defaultdict 

114from collections.abc import Mapping, Sequence 

115from concurrent.futures import Executor, Future 

116from functools import partial 

117from queue import Empty, Queue 

118 

119from dask import config 

120from dask._task_spec import DataNode, convert_legacy_graph 

121from dask.callbacks import local_callbacks, unpack_callbacks 

122from dask.core import flatten, get_dependencies 

123from dask.order import order 

124from dask.typing import Key 

125 

126if os.name == "nt": 

127 # Python 3 windows Queue.get doesn't handle interrupts properly. To 

128 # workaround this we poll at a sufficiently large interval that it 

129 # shouldn't affect performance, but small enough that users trying to kill 

130 # an application shouldn't care. 

131 def queue_get(q): 

132 while True: 

133 try: 

134 return q.get(block=True, timeout=0.1) 

135 except Empty: 

136 pass 

137 

138else: 

139 

140 def queue_get(q): 

141 return q.get() 

142 

143 

144def start_state_from_dask(dsk, cache=None, sortkey=None, keys=None): 

145 """Start state from a dask 

146 

147 Examples 

148 -------- 

149 >>> inc = lambda x: x + 1 

150 >>> add = lambda x, y: x + y 

151 >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP 

152 >>> from pprint import pprint # doctest: +SKIP 

153 >>> pprint(start_state_from_dask(dsk)) # doctest: +SKIP 

154 {'cache': {'x': 1, 'y': 2}, 

155 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}}, 

156 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}), 

157 'finished': set(), 

158 'ready': ['z'], 

159 'released': set(), 

160 'running': set(), 

161 'waiting': {'w': {'z'}}, 

162 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}} 

163 """ 

164 if sortkey is None: 

165 sortkey = order(dsk).get 

166 if cache is None: 

167 cache = config.get("cache", None) 

168 if cache is None: 

169 cache = dict() 

170 if keys is None: 

171 keys = list(set(dsk) - set(cache)) 

172 dsk = convert_legacy_graph(dsk, all_keys=set(dsk) | set(cache)) 

173 stack = list(keys) 

174 dependencies = defaultdict(set) 

175 dependents = defaultdict(set) 

176 waiting = defaultdict(set) 

177 waiting_data = defaultdict(set) 

178 ready_set = set() 

179 seen = set() 

180 while stack: 

181 key = stack.pop() 

182 if key in seen: 

183 continue 

184 seen.add(key) 

185 dependents[key] 

186 waiting_data[key] 

187 dependencies[key] 

188 task = dsk.get(key, None) 

189 if task is None: 

190 if dependents[key] and not cache.get(key): 

191 raise ValueError( 

192 f"Missing dependency {key} for dependents {dependents[key]}" 

193 ) 

194 continue 

195 elif isinstance(task, DataNode): 

196 cache[key] = task() 

197 dependencies[key] 

198 for d in dependents[key]: 

199 if d in waiting: 

200 waiting[d].remove(key) 

201 if not waiting[d]: 

202 del waiting[d] 

203 ready_set.add(d) 

204 else: 

205 ready_set.add(d) 

206 else: 

207 _wait = task.dependencies - set(cache) 

208 if not _wait: 

209 ready_set.add(key) 

210 else: 

211 waiting[key] = set(_wait) 

212 for dep in task.dependencies: 

213 dependencies[key].add(dep) 

214 dependents[dep].add(key) 

215 waiting_data[dep].add(key) 

216 stack.append(dep) 

217 

218 ready = sorted(ready_set, key=sortkey, reverse=True) 

219 

220 state = { 

221 "dependencies": dict(dependencies), 

222 "dependents": dict(dependents), 

223 "waiting": dict(waiting), 

224 "waiting_data": dict(waiting_data), 

225 "cache": cache, 

226 "ready": ready, 

227 "running": set(), 

228 "finished": set(), 

229 "released": set(), 

230 } 

231 

232 return state 

233 

234 

235""" 

236Running tasks 

237------------- 

238 

239When we execute tasks we both 

240 

2411. Perform the actual work of collecting the appropriate data and calling the function 

2422. Manage administrative state to coordinate with the scheduler 

243""" 

244 

245 

246def execute_task(key, task_info, dumps, loads, get_id, pack_exception): 

247 """ 

248 Compute task and handle all administration 

249 

250 See Also 

251 -------- 

252 _execute_task : actually execute task 

253 """ 

254 try: 

255 task, data = loads(task_info) 

256 result = task(data) 

257 id = get_id() 

258 result = dumps((result, id)) 

259 failed = False 

260 except BaseException as e: 

261 result = pack_exception(e, dumps) 

262 failed = True 

263 return key, result, failed 

264 

265 

266def batch_execute_tasks(it): 

267 """ 

268 Batch computing of multiple tasks with `execute_task` 

269 """ 

270 return [execute_task(*a) for a in it] 

271 

272 

273def release_data(key, state, delete=True): 

274 """Remove data from temporary storage 

275 

276 See Also 

277 -------- 

278 finish_task 

279 """ 

280 if key in state["waiting_data"]: 

281 assert not state["waiting_data"][key] 

282 del state["waiting_data"][key] 

283 

284 state["released"].add(key) 

285 

286 if delete: 

287 del state["cache"][key] 

288 

289 

290def finish_task( 

291 dsk, key, state, results, sortkey, delete=True, release_data=release_data 

292): 

293 """ 

294 Update execution state after a task finishes 

295 

296 Mutates. This should run atomically (with a lock). 

297 """ 

298 for dep in sorted(state["dependents"][key], key=sortkey, reverse=True): 

299 s = state["waiting"][dep] 

300 s.remove(key) 

301 if not s: 

302 del state["waiting"][dep] 

303 state["ready"].append(dep) 

304 

305 for dep in state["dependencies"][key]: 

306 if dep in state["waiting_data"]: 

307 s = state["waiting_data"][dep] 

308 s.remove(key) 

309 if not s and dep not in results: 

310 release_data(dep, state, delete=delete) 

311 elif delete and dep not in results: 

312 release_data(dep, state, delete=delete) 

313 

314 state["finished"].add(key) 

315 state["running"].remove(key) 

316 

317 return state 

318 

319 

320def nested_get(ind, coll): 

321 """Get nested index from collection 

322 

323 Examples 

324 -------- 

325 

326 >>> nested_get(1, 'abc') 

327 'b' 

328 >>> nested_get([1, 0], 'abc') 

329 ('b', 'a') 

330 >>> nested_get([[1, 0], [0, 1]], 'abc') 

331 (('b', 'a'), ('a', 'b')) 

332 """ 

333 if isinstance(ind, list): 

334 return tuple(nested_get(i, coll) for i in ind) 

335 else: 

336 return coll[ind] 

337 

338 

339def default_get_id(): 

340 """Default get_id""" 

341 return None 

342 

343 

344def default_pack_exception(e, dumps): 

345 raise e 

346 

347 

348def reraise(exc, tb=None): 

349 if exc.__traceback__ is not tb: 

350 raise exc.with_traceback(tb) 

351 raise exc 

352 

353 

354def identity(x): 

355 """Identity function. Returns x. 

356 

357 >>> identity(3) 

358 3 

359 """ 

360 return x 

361 

362 

363""" 

364Task Selection 

365-------------- 

366 

367We often have a choice among many tasks to run next. This choice is both 

368cheap and can significantly impact performance. 

369 

370We currently select tasks that have recently been made ready. We hope that 

371this first-in-first-out policy reduces memory footprint 

372""" 

373 

374""" 

375`get` 

376----- 

377 

378The main function of the scheduler. Get is the main entry point. 

379""" 

380 

381 

382def get_async( 

383 submit, 

384 num_workers, 

385 dsk, 

386 result, 

387 cache=None, 

388 get_id=default_get_id, 

389 rerun_exceptions_locally=None, 

390 pack_exception=default_pack_exception, 

391 raise_exception=reraise, 

392 callbacks=None, 

393 dumps=identity, 

394 loads=identity, 

395 chunksize=None, 

396 **kwargs, 

397): 

398 """Asynchronous get function 

399 

400 This is a general version of various asynchronous schedulers for dask. It 

401 takes a ``concurrent.futures.Executor.submit`` function to form a more 

402 specific ``get`` method that walks through the dask array with parallel 

403 workers, avoiding repeat computation and minimizing memory use. 

404 

405 Parameters 

406 ---------- 

407 submit : function 

408 A ``concurrent.futures.Executor.submit`` function 

409 num_workers : int 

410 The number of workers that task submissions can be spread over 

411 dsk : dict 

412 A dask dictionary specifying a workflow 

413 result : key or list of keys 

414 Keys corresponding to desired data 

415 cache : dict-like, optional 

416 Temporary storage of results 

417 get_id : callable, optional 

418 Function to return the worker id, takes no arguments. Examples are 

419 `threading.current_thread` and `multiprocessing.current_process`. 

420 rerun_exceptions_locally : bool, optional 

421 Whether to rerun failing tasks in local process to enable debugging 

422 (False by default) 

423 pack_exception : callable, optional 

424 Function to take an exception and ``dumps`` method, and return a 

425 serialized tuple of ``(exception, traceback)`` to send back to the 

426 scheduler. Default is to just raise the exception. 

427 raise_exception : callable, optional 

428 Function that takes an exception and a traceback, and raises an error. 

429 callbacks : tuple or list of tuples, optional 

430 Callbacks are passed in as tuples of length 5. Multiple sets of 

431 callbacks may be passed in as a list of tuples. For more information, 

432 see the dask.diagnostics documentation. 

433 dumps: callable, optional 

434 Function to serialize task data and results to communicate between 

435 worker and parent. Defaults to identity. 

436 loads: callable, optional 

437 Inverse function of `dumps`. Defaults to identity. 

438 chunksize: int, optional 

439 Size of chunks to use when dispatching work. Defaults to 1. 

440 If -1, will be computed to evenly divide ready work across workers. 

441 

442 See Also 

443 -------- 

444 threaded.get 

445 """ 

446 chunksize = chunksize or config.get("chunksize", 1) 

447 

448 queue = Queue() 

449 

450 if isinstance(result, list): 

451 result_flat = set(flatten(result)) 

452 else: 

453 result_flat = {result} 

454 results = set(result_flat) 

455 

456 if not isinstance(dsk, Mapping): 

457 dsk = dsk.__dask_graph__() 

458 dsk = convert_legacy_graph(dsk) 

459 with local_callbacks(callbacks) as callbacks: 

460 _, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks) 

461 started_cbs = [] 

462 succeeded = False 

463 # if start_state_from_dask fails, we will have something 

464 # to pass to the final block. 

465 state = {} 

466 try: 

467 for cb in callbacks: 

468 if cb[0]: 

469 cb[0](dsk) 

470 started_cbs.append(cb) 

471 

472 keyorder = order(dsk) 

473 

474 state = start_state_from_dask( 

475 dsk, keys=results, cache=cache, sortkey=keyorder.get 

476 ) 

477 

478 for _, start_state, _, _, _ in callbacks: 

479 if start_state: 

480 start_state(dsk, state) 

481 

482 if rerun_exceptions_locally is None: 

483 rerun_exceptions_locally = config.get("rerun_exceptions_locally", False) 

484 

485 if state["waiting"] and not state["ready"]: 

486 raise ValueError("Found no accessible jobs in dask") 

487 

488 def fire_tasks(chunksize): 

489 """Fire off a task to the thread pool""" 

490 # Determine chunksize and/or number of tasks to submit 

491 nready = len(state["ready"]) 

492 if chunksize == -1: 

493 ntasks = nready 

494 chunksize = -(ntasks // -num_workers) 

495 else: 

496 used_workers = -(len(state["running"]) // -chunksize) 

497 avail_workers = max(num_workers - used_workers, 0) 

498 ntasks = min(nready, chunksize * avail_workers) 

499 

500 # Prep all ready tasks for submission 

501 args = [] 

502 for _ in range(ntasks): 

503 # Get the next task to compute (most recently added) 

504 key = state["ready"].pop() 

505 # Notify task is running 

506 state["running"].add(key) 

507 for f in pretask_cbs: 

508 f(key, dsk, state) 

509 

510 # Prep args to send 

511 data = { 

512 dep: state["cache"][dep] for dep in state["dependencies"][key] 

513 } 

514 args.append( 

515 ( 

516 key, 

517 dumps((dsk[key], data)), 

518 dumps, 

519 loads, 

520 get_id, 

521 pack_exception, 

522 ) 

523 ) 

524 

525 # Batch submit 

526 for i in range(-(len(args) // -chunksize)): 

527 each_args = args[i * chunksize : (i + 1) * chunksize] 

528 if not each_args: 

529 break 

530 fut = submit(batch_execute_tasks, each_args) 

531 fut.add_done_callback(queue.put) 

532 

533 # Main loop, wait on tasks to finish, insert new ones 

534 while state["waiting"] or state["ready"] or state["running"]: 

535 fire_tasks(chunksize) 

536 for key, res_info, failed in queue_get(queue).result(): 

537 if failed: 

538 exc, tb = loads(res_info) 

539 if rerun_exceptions_locally: 

540 data = { 

541 dep: state["cache"][dep] 

542 for dep in get_dependencies(dsk, key) 

543 } 

544 task = dsk[key] 

545 task(data) # Re-execute locally 

546 else: 

547 raise_exception(exc, tb) 

548 res, worker_id = loads(res_info) 

549 state["cache"][key] = res 

550 finish_task(dsk, key, state, results, keyorder.get) 

551 for f in posttask_cbs: 

552 f(key, res, dsk, state, worker_id) 

553 

554 succeeded = True 

555 

556 finally: 

557 for _, _, _, _, finish in started_cbs: 

558 if finish: 

559 finish(dsk, state, not succeeded) 

560 

561 return nested_get(result, state["cache"]) 

562 

563 

564""" Synchronous concrete version of get_async 

565 

566Usually we supply a ``concurrent.futures.Executor``. Here we provide a 

567sequential one. This is useful for debugging and for code dominated by the 

568GIL 

569""" 

570 

571 

572class SynchronousExecutor(Executor): 

573 _max_workers = 1 

574 

575 def submit(self, fn, *args, **kwargs): 

576 fut = Future() 

577 try: 

578 fut.set_result(fn(*args, **kwargs)) 

579 except BaseException as e: 

580 fut.set_exception(e) 

581 return fut 

582 

583 

584synchronous_executor = SynchronousExecutor() 

585 

586 

587def get_sync(dsk: Mapping, keys: Sequence[Key] | Key, **kwargs): 

588 """A naive synchronous version of get_async 

589 

590 Can be useful for debugging. 

591 """ 

592 kwargs.pop("num_workers", None) # if num_workers present, remove it 

593 return get_async( 

594 synchronous_executor.submit, 

595 synchronous_executor._max_workers, 

596 dsk, 

597 keys, 

598 **kwargs, 

599 ) 

600 

601 

602""" Adaptor for ``multiprocessing.Pool`` instances 

603 

604Usually we supply a ``concurrent.futures.Executor``. Here we provide a wrapper 

605class for ``multiprocessing.Pool`` instances so we can treat them like 

606``concurrent.futures.Executor`` instances instead. 

607 

608This is mainly useful for legacy use cases or users that prefer 

609``multiprocessing.Pool``. 

610""" 

611 

612 

613class MultiprocessingPoolExecutor(Executor): 

614 def __init__(self, pool): 

615 self.pool = pool 

616 self._max_workers = len(pool._pool) 

617 

618 def submit(self, fn, *args, **kwargs): 

619 return submit_apply_async(self.pool.apply_async, fn, *args, **kwargs) 

620 

621 

622def submit_apply_async(apply_async, fn, *args, **kwargs): 

623 fut = Future() 

624 apply_async(fn, args, kwargs, fut.set_result, fut.set_exception) 

625 return fut 

626 

627 

628def get_apply_async(apply_async, num_workers, *args, **kwargs): 

629 return get_async( 

630 partial(submit_apply_async, apply_async), num_workers, *args, **kwargs 

631 ) 

632 

633 

634def sortkey(item): 

635 """Sorting key function that is robust to different types 

636 

637 Both strings and tuples are common key types in dask graphs. 

638 However In Python 3 one can not compare strings with tuples directly. 

639 This function maps many types to a form where they can be compared 

640 

641 Examples 

642 -------- 

643 >>> sortkey('Hello') 

644 ('str', 'Hello') 

645 

646 >>> sortkey(('x', 1)) 

647 ('tuple', ('x', 1)) 

648 """ 

649 return (type(item).__name__, item)