Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/local.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1""" 

2Asynchronous Shared-Memory Scheduler for Dask Graphs. 

3 

4This scheduler coordinates several workers to execute tasks in a dask graph in 

5parallel. It depends on a ``concurrent.futures.Executor`` 

6and a corresponding Queue for worker-to-scheduler communication. 

7 

8It tries to execute tasks in an order which maintains a small memory footprint 

9throughout execution. It does this by running tasks that allow us to release 

10data resources. 

11 

12 

13Task Selection Policy 

14===================== 

15 

16When we complete a task we add more data in to our set of available data; this 

17new data makes new tasks available. We preferentially choose tasks that were 

18just made available in a last-in-first-out fashion. We implement this as a 

19simple stack. This results in more depth-first rather than breadth first 

20behavior which encourages us to process batches of data to completion before 

21starting in on new data when possible. 

22 

23When the addition of new data readies multiple tasks simultaneously we add 

24tasks to the stack in sorted order so that tasks with greater keynames are run 

25first. This can be handy to break ties in a predictable fashion. 

26 

27 

28State 

29===== 

30 

31Many functions pass around a ``state`` variable that holds the current state of 

32the computation. This variable consists of several other dictionaries and 

33sets, explained below. 

34 

35Constant state 

36-------------- 

37 

381. dependencies: {x: [a, b ,c]} a,b,c, must be run before x 

392. dependents: {a: [x, y]} a must run before x or y 

40 

41Changing state 

42-------------- 

43 

44### Data 

45 

461. cache: available concrete data. {key: actual-data} 

472. released: data that we've seen, used, and released because it is no longer 

48 needed 

49 

50### Jobs 

51 

521. ready: A fifo stack of ready-to-run tasks 

532. running: A set of tasks currently in execution 

543. finished: A set of finished tasks 

554. waiting: which tasks are still waiting on others :: {key: {keys}} 

56 Real-time equivalent of dependencies 

575. waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}} 

58 Real-time equivalent of dependents 

59 

60 

61Examples 

62-------- 

63 

64>>> import pprint # doctest: +SKIP 

65>>> inc = lambda x: x + 1 

66>>> add = lambda x, y: x + y 

67>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP 

68>>> pprint.pprint(start_state_from_dask(dsk)) # doctest: +SKIP 

69{'cache': {'x': 1, 'y': 2}, 

70 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}}, 

71 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}), 

72 'finished': set(), 

73 'ready': ['z'], 

74 'released': set(), 

75 'running': set(), 

76 'waiting': {'w': {'z'}}, 

77 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}} 

78 

79Optimizations 

80============= 

81 

82We build this scheduler with out-of-core array operations in mind. To this end 

83we have encoded some particular optimizations. 

84 

85Compute to release data 

86----------------------- 

87 

88When we choose a new task to execute we often have many options. Policies at 

89this stage are cheap and can significantly impact performance. One could 

90imagine policies that expose parallelism, drive towards a particular output, 

91etc.. 

92 

93Our current policy is to run tasks that were most recently made available. 

94 

95 

96Inlining computations 

97--------------------- 

98 

99We hold on to intermediate computations either in memory or on disk. 

100 

101For very cheap computations that may emit new copies of the data, like 

102``np.transpose`` or possibly even ``x + 1`` we choose not to store these as 

103separate pieces of data / tasks. Instead we combine them with the computations 

104that require them. This may result in repeated computation but saves 

105significantly on space and computation complexity. 

106 

107See the function ``inline_functions`` for more information. 

108""" 

109 

110from __future__ import annotations 

111 

112import os 

113from collections import defaultdict 

114from collections.abc import Mapping, Sequence 

115from concurrent.futures import Executor, Future 

116from functools import partial 

117from queue import Empty, Queue 

118 

119from dask import config 

120from dask._task_spec import DataNode, convert_legacy_graph 

121from dask.callbacks import local_callbacks, unpack_callbacks 

122from dask.core import flatten, get_dependencies 

123from dask.order import order 

124from dask.typing import Key 

125 

126if os.name == "nt": 

127 # Python 3 windows Queue.get doesn't handle interrupts properly. To 

128 # workaround this we poll at a sufficiently large interval that it 

129 # shouldn't affect performance, but small enough that users trying to kill 

130 # an application shouldn't care. 

131 def queue_get(q): 

132 while True: 

133 try: 

134 return q.get(block=True, timeout=0.1) 

135 except Empty: 

136 pass 

137 

138else: 

139 

140 def queue_get(q): 

141 return q.get() 

142 

143 

144def start_state_from_dask(dsk, cache=None, sortkey=None, keys=None): 

145 """Start state from a dask 

146 

147 Examples 

148 -------- 

149 >>> inc = lambda x: x + 1 

150 >>> add = lambda x, y: x + y 

151 >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP 

152 >>> from pprint import pprint # doctest: +SKIP 

153 >>> pprint(start_state_from_dask(dsk)) # doctest: +SKIP 

154 {'cache': {'x': 1, 'y': 2}, 

155 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}}, 

156 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}), 

157 'finished': set(), 

158 'ready': ['z'], 

159 'released': set(), 

160 'running': set(), 

161 'waiting': {'w': {'z'}}, 

162 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}} 

163 """ 

164 if sortkey is None: 

165 sortkey = order(dsk).get 

166 if cache is None: 

167 cache = config.get("cache", None) 

168 if cache is None: 

169 cache = dict() 

170 if keys is None: 

171 keys = list(set(dsk) - set(cache)) 

172 dsk = convert_legacy_graph(dsk, all_keys=set(dsk) | set(cache)) 

173 stack = list(keys) 

174 dependencies = defaultdict(set) 

175 dependents = defaultdict(set) 

176 waiting = defaultdict(set) 

177 waiting_data = defaultdict(set) 

178 ready_set = set() 

179 seen = set() 

180 while stack: 

181 key = stack.pop() 

182 if key in seen: 

183 continue 

184 seen.add(key) 

185 dependents[key] 

186 waiting_data[key] 

187 dependencies[key] 

188 task = dsk.get(key, None) 

189 if task is None: 

190 if dependents[key] and not cache.get(key, None): 

191 raise ValueError( 

192 "Missing dependency {} for dependents {}".format( 

193 key, dependents[key] 

194 ) 

195 ) 

196 continue 

197 elif isinstance(task, DataNode): 

198 cache[key] = task() 

199 dependencies[key] 

200 for d in dependents[key]: 

201 if d in waiting: 

202 waiting[d].remove(key) 

203 if not waiting[d]: 

204 del waiting[d] 

205 ready_set.add(d) 

206 else: 

207 ready_set.add(d) 

208 else: 

209 _wait = task.dependencies - set(cache) 

210 if not _wait: 

211 ready_set.add(key) 

212 else: 

213 waiting[key] = set(_wait) 

214 for dep in task.dependencies: 

215 dependencies[key].add(dep) 

216 dependents[dep].add(key) 

217 waiting_data[dep].add(key) 

218 stack.append(dep) 

219 

220 ready = sorted(ready_set, key=sortkey, reverse=True) 

221 

222 state = { 

223 "dependencies": dict(dependencies), 

224 "dependents": dict(dependents), 

225 "waiting": dict(waiting), 

226 "waiting_data": dict(waiting_data), 

227 "cache": cache, 

228 "ready": ready, 

229 "running": set(), 

230 "finished": set(), 

231 "released": set(), 

232 } 

233 

234 return state 

235 

236 

237""" 

238Running tasks 

239------------- 

240 

241When we execute tasks we both 

242 

2431. Perform the actual work of collecting the appropriate data and calling the function 

2442. Manage administrative state to coordinate with the scheduler 

245""" 

246 

247 

248def execute_task(key, task_info, dumps, loads, get_id, pack_exception): 

249 """ 

250 Compute task and handle all administration 

251 

252 See Also 

253 -------- 

254 _execute_task : actually execute task 

255 """ 

256 try: 

257 task, data = loads(task_info) 

258 result = task(data) 

259 id = get_id() 

260 result = dumps((result, id)) 

261 failed = False 

262 except BaseException as e: # noqa: B036 

263 result = pack_exception(e, dumps) 

264 failed = True 

265 return key, result, failed 

266 

267 

268def batch_execute_tasks(it): 

269 """ 

270 Batch computing of multiple tasks with `execute_task` 

271 """ 

272 return [execute_task(*a) for a in it] 

273 

274 

275def release_data(key, state, delete=True): 

276 """Remove data from temporary storage 

277 

278 See Also 

279 -------- 

280 finish_task 

281 """ 

282 if key in state["waiting_data"]: 

283 assert not state["waiting_data"][key] 

284 del state["waiting_data"][key] 

285 

286 state["released"].add(key) 

287 

288 if delete: 

289 del state["cache"][key] 

290 

291 

292def finish_task( 

293 dsk, key, state, results, sortkey, delete=True, release_data=release_data 

294): 

295 """ 

296 Update execution state after a task finishes 

297 

298 Mutates. This should run atomically (with a lock). 

299 """ 

300 for dep in sorted(state["dependents"][key], key=sortkey, reverse=True): 

301 s = state["waiting"][dep] 

302 s.remove(key) 

303 if not s: 

304 del state["waiting"][dep] 

305 state["ready"].append(dep) 

306 

307 for dep in state["dependencies"][key]: 

308 if dep in state["waiting_data"]: 

309 s = state["waiting_data"][dep] 

310 s.remove(key) 

311 if not s and dep not in results: 

312 release_data(dep, state, delete=delete) 

313 elif delete and dep not in results: 

314 release_data(dep, state, delete=delete) 

315 

316 state["finished"].add(key) 

317 state["running"].remove(key) 

318 

319 return state 

320 

321 

322def nested_get(ind, coll): 

323 """Get nested index from collection 

324 

325 Examples 

326 -------- 

327 

328 >>> nested_get(1, 'abc') 

329 'b' 

330 >>> nested_get([1, 0], 'abc') 

331 ('b', 'a') 

332 >>> nested_get([[1, 0], [0, 1]], 'abc') 

333 (('b', 'a'), ('a', 'b')) 

334 """ 

335 if isinstance(ind, list): 

336 return tuple(nested_get(i, coll) for i in ind) 

337 else: 

338 return coll[ind] 

339 

340 

341def default_get_id(): 

342 """Default get_id""" 

343 return None 

344 

345 

346def default_pack_exception(e, dumps): 

347 raise 

348 

349 

350def reraise(exc, tb=None): 

351 if exc.__traceback__ is not tb: 

352 raise exc.with_traceback(tb) 

353 raise exc 

354 

355 

356def identity(x): 

357 """Identity function. Returns x. 

358 

359 >>> identity(3) 

360 3 

361 """ 

362 return x 

363 

364 

365""" 

366Task Selection 

367-------------- 

368 

369We often have a choice among many tasks to run next. This choice is both 

370cheap and can significantly impact performance. 

371 

372We currently select tasks that have recently been made ready. We hope that 

373this first-in-first-out policy reduces memory footprint 

374""" 

375 

376""" 

377`get` 

378----- 

379 

380The main function of the scheduler. Get is the main entry point. 

381""" 

382 

383 

384def get_async( 

385 submit, 

386 num_workers, 

387 dsk, 

388 result, 

389 cache=None, 

390 get_id=default_get_id, 

391 rerun_exceptions_locally=None, 

392 pack_exception=default_pack_exception, 

393 raise_exception=reraise, 

394 callbacks=None, 

395 dumps=identity, 

396 loads=identity, 

397 chunksize=None, 

398 **kwargs, 

399): 

400 """Asynchronous get function 

401 

402 This is a general version of various asynchronous schedulers for dask. It 

403 takes a ``concurrent.futures.Executor.submit`` function to form a more 

404 specific ``get`` method that walks through the dask array with parallel 

405 workers, avoiding repeat computation and minimizing memory use. 

406 

407 Parameters 

408 ---------- 

409 submit : function 

410 A ``concurrent.futures.Executor.submit`` function 

411 num_workers : int 

412 The number of workers that task submissions can be spread over 

413 dsk : dict 

414 A dask dictionary specifying a workflow 

415 result : key or list of keys 

416 Keys corresponding to desired data 

417 cache : dict-like, optional 

418 Temporary storage of results 

419 get_id : callable, optional 

420 Function to return the worker id, takes no arguments. Examples are 

421 `threading.current_thread` and `multiprocessing.current_process`. 

422 rerun_exceptions_locally : bool, optional 

423 Whether to rerun failing tasks in local process to enable debugging 

424 (False by default) 

425 pack_exception : callable, optional 

426 Function to take an exception and ``dumps`` method, and return a 

427 serialized tuple of ``(exception, traceback)`` to send back to the 

428 scheduler. Default is to just raise the exception. 

429 raise_exception : callable, optional 

430 Function that takes an exception and a traceback, and raises an error. 

431 callbacks : tuple or list of tuples, optional 

432 Callbacks are passed in as tuples of length 5. Multiple sets of 

433 callbacks may be passed in as a list of tuples. For more information, 

434 see the dask.diagnostics documentation. 

435 dumps: callable, optional 

436 Function to serialize task data and results to communicate between 

437 worker and parent. Defaults to identity. 

438 loads: callable, optional 

439 Inverse function of `dumps`. Defaults to identity. 

440 chunksize: int, optional 

441 Size of chunks to use when dispatching work. Defaults to 1. 

442 If -1, will be computed to evenly divide ready work across workers. 

443 

444 See Also 

445 -------- 

446 threaded.get 

447 """ 

448 chunksize = chunksize or config.get("chunksize", 1) 

449 

450 queue = Queue() 

451 

452 if isinstance(result, list): 

453 result_flat = set(flatten(result)) 

454 else: 

455 result_flat = {result} 

456 results = set(result_flat) 

457 

458 if not isinstance(dsk, Mapping): 

459 dsk = dsk.__dask_graph__() 

460 dsk = convert_legacy_graph(dsk) 

461 with local_callbacks(callbacks) as callbacks: 

462 _, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks) 

463 started_cbs = [] 

464 succeeded = False 

465 # if start_state_from_dask fails, we will have something 

466 # to pass to the final block. 

467 state = {} 

468 try: 

469 for cb in callbacks: 

470 if cb[0]: 

471 cb[0](dsk) 

472 started_cbs.append(cb) 

473 

474 keyorder = order(dsk) 

475 

476 state = start_state_from_dask( 

477 dsk, keys=results, cache=cache, sortkey=keyorder.get 

478 ) 

479 

480 for _, start_state, _, _, _ in callbacks: 

481 if start_state: 

482 start_state(dsk, state) 

483 

484 if rerun_exceptions_locally is None: 

485 rerun_exceptions_locally = config.get("rerun_exceptions_locally", False) 

486 

487 if state["waiting"] and not state["ready"]: 

488 raise ValueError("Found no accessible jobs in dask") 

489 

490 def fire_tasks(chunksize): 

491 """Fire off a task to the thread pool""" 

492 # Determine chunksize and/or number of tasks to submit 

493 nready = len(state["ready"]) 

494 if chunksize == -1: 

495 ntasks = nready 

496 chunksize = -(ntasks // -num_workers) 

497 else: 

498 used_workers = -(len(state["running"]) // -chunksize) 

499 avail_workers = max(num_workers - used_workers, 0) 

500 ntasks = min(nready, chunksize * avail_workers) 

501 

502 # Prep all ready tasks for submission 

503 args = [] 

504 for _ in range(ntasks): 

505 # Get the next task to compute (most recently added) 

506 key = state["ready"].pop() 

507 # Notify task is running 

508 state["running"].add(key) 

509 for f in pretask_cbs: 

510 f(key, dsk, state) 

511 

512 # Prep args to send 

513 data = { 

514 dep: state["cache"][dep] for dep in state["dependencies"][key] 

515 } 

516 args.append( 

517 ( 

518 key, 

519 dumps((dsk[key], data)), 

520 dumps, 

521 loads, 

522 get_id, 

523 pack_exception, 

524 ) 

525 ) 

526 

527 # Batch submit 

528 for i in range(-(len(args) // -chunksize)): 

529 each_args = args[i * chunksize : (i + 1) * chunksize] 

530 if not each_args: 

531 break 

532 fut = submit(batch_execute_tasks, each_args) 

533 fut.add_done_callback(queue.put) 

534 

535 # Main loop, wait on tasks to finish, insert new ones 

536 while state["waiting"] or state["ready"] or state["running"]: 

537 fire_tasks(chunksize) 

538 for key, res_info, failed in queue_get(queue).result(): 

539 if failed: 

540 exc, tb = loads(res_info) 

541 if rerun_exceptions_locally: 

542 data = { 

543 dep: state["cache"][dep] 

544 for dep in get_dependencies(dsk, key) 

545 } 

546 task = dsk[key] 

547 task(data) # Re-execute locally 

548 else: 

549 raise_exception(exc, tb) 

550 res, worker_id = loads(res_info) 

551 state["cache"][key] = res 

552 finish_task(dsk, key, state, results, keyorder.get) 

553 for f in posttask_cbs: 

554 f(key, res, dsk, state, worker_id) 

555 

556 succeeded = True 

557 

558 finally: 

559 for _, _, _, _, finish in started_cbs: 

560 if finish: 

561 finish(dsk, state, not succeeded) 

562 

563 return nested_get(result, state["cache"]) 

564 

565 

566""" Synchronous concrete version of get_async 

567 

568Usually we supply a ``concurrent.futures.Executor``. Here we provide a 

569sequential one. This is useful for debugging and for code dominated by the 

570GIL 

571""" 

572 

573 

574class SynchronousExecutor(Executor): 

575 _max_workers = 1 

576 

577 def submit(self, fn, *args, **kwargs): 

578 fut = Future() 

579 try: 

580 fut.set_result(fn(*args, **kwargs)) 

581 except BaseException as e: # noqa: B036 

582 fut.set_exception(e) 

583 return fut 

584 

585 

586synchronous_executor = SynchronousExecutor() 

587 

588 

589def get_sync(dsk: Mapping, keys: Sequence[Key] | Key, **kwargs): 

590 """A naive synchronous version of get_async 

591 

592 Can be useful for debugging. 

593 """ 

594 kwargs.pop("num_workers", None) # if num_workers present, remove it 

595 return get_async( 

596 synchronous_executor.submit, 

597 synchronous_executor._max_workers, 

598 dsk, 

599 keys, 

600 **kwargs, 

601 ) 

602 

603 

604""" Adaptor for ``multiprocessing.Pool`` instances 

605 

606Usually we supply a ``concurrent.futures.Executor``. Here we provide a wrapper 

607class for ``multiprocessing.Pool`` instances so we can treat them like 

608``concurrent.futures.Executor`` instances instead. 

609 

610This is mainly useful for legacy use cases or users that prefer 

611``multiprocessing.Pool``. 

612""" 

613 

614 

615class MultiprocessingPoolExecutor(Executor): 

616 def __init__(self, pool): 

617 self.pool = pool 

618 self._max_workers = len(pool._pool) 

619 

620 def submit(self, fn, *args, **kwargs): 

621 return submit_apply_async(self.pool.apply_async, fn, *args, **kwargs) 

622 

623 

624def submit_apply_async(apply_async, fn, *args, **kwargs): 

625 fut = Future() 

626 apply_async(fn, args, kwargs, fut.set_result, fut.set_exception) 

627 return fut 

628 

629 

630def get_apply_async(apply_async, num_workers, *args, **kwargs): 

631 return get_async( 

632 partial(submit_apply_async, apply_async), num_workers, *args, **kwargs 

633 ) 

634 

635 

636def sortkey(item): 

637 """Sorting key function that is robust to different types 

638 

639 Both strings and tuples are common key types in dask graphs. 

640 However In Python 3 one can not compare strings with tuples directly. 

641 This function maps many types to a form where they can be compared 

642 

643 Examples 

644 -------- 

645 >>> sortkey('Hello') 

646 ('str', 'Hello') 

647 

648 >>> sortkey(('x', 1)) 

649 ('tuple', ('x', 1)) 

650 """ 

651 return (type(item).__name__, item)