Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/local.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Asynchronous Shared-Memory Scheduler for Dask Graphs.
4This scheduler coordinates several workers to execute tasks in a dask graph in
5parallel. It depends on a ``concurrent.futures.Executor``
6and a corresponding Queue for worker-to-scheduler communication.
8It tries to execute tasks in an order which maintains a small memory footprint
9throughout execution. It does this by running tasks that allow us to release
10data resources.
13Task Selection Policy
14=====================
16When we complete a task we add more data in to our set of available data; this
17new data makes new tasks available. We preferentially choose tasks that were
18just made available in a last-in-first-out fashion. We implement this as a
19simple stack. This results in more depth-first rather than breadth first
20behavior which encourages us to process batches of data to completion before
21starting in on new data when possible.
23When the addition of new data readies multiple tasks simultaneously we add
24tasks to the stack in sorted order so that tasks with greater keynames are run
25first. This can be handy to break ties in a predictable fashion.
28State
29=====
31Many functions pass around a ``state`` variable that holds the current state of
32the computation. This variable consists of several other dictionaries and
33sets, explained below.
35Constant state
36--------------
381. dependencies: {x: [a, b ,c]} a,b,c, must be run before x
392. dependents: {a: [x, y]} a must run before x or y
41Changing state
42--------------
44### Data
461. cache: available concrete data. {key: actual-data}
472. released: data that we've seen, used, and released because it is no longer
48 needed
50### Jobs
521. ready: A fifo stack of ready-to-run tasks
532. running: A set of tasks currently in execution
543. finished: A set of finished tasks
554. waiting: which tasks are still waiting on others :: {key: {keys}}
56 Real-time equivalent of dependencies
575. waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}}
58 Real-time equivalent of dependents
61Examples
62--------
64>>> import pprint # doctest: +SKIP
65>>> inc = lambda x: x + 1
66>>> add = lambda x, y: x + y
67>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP
68>>> pprint.pprint(start_state_from_dask(dsk)) # doctest: +SKIP
69{'cache': {'x': 1, 'y': 2},
70 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
71 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}),
72 'finished': set(),
73 'ready': ['z'],
74 'released': set(),
75 'running': set(),
76 'waiting': {'w': {'z'}},
77 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
79Optimizations
80=============
82We build this scheduler with out-of-core array operations in mind. To this end
83we have encoded some particular optimizations.
85Compute to release data
86-----------------------
88When we choose a new task to execute we often have many options. Policies at
89this stage are cheap and can significantly impact performance. One could
90imagine policies that expose parallelism, drive towards a particular output,
91etc..
93Our current policy is to run tasks that were most recently made available.
96Inlining computations
97---------------------
99We hold on to intermediate computations either in memory or on disk.
101For very cheap computations that may emit new copies of the data, like
102``np.transpose`` or possibly even ``x + 1`` we choose not to store these as
103separate pieces of data / tasks. Instead we combine them with the computations
104that require them. This may result in repeated computation but saves
105significantly on space and computation complexity.
107See the function ``inline_functions`` for more information.
108"""
110from __future__ import annotations
112import os
113from collections import defaultdict
114from collections.abc import Mapping, Sequence
115from concurrent.futures import Executor, Future
116from functools import partial
117from queue import Empty, Queue
119from dask import config
120from dask._task_spec import DataNode, convert_legacy_graph
121from dask.callbacks import local_callbacks, unpack_callbacks
122from dask.core import flatten, get_dependencies
123from dask.order import order
124from dask.typing import Key
126if os.name == "nt":
127 # Python 3 windows Queue.get doesn't handle interrupts properly. To
128 # workaround this we poll at a sufficiently large interval that it
129 # shouldn't affect performance, but small enough that users trying to kill
130 # an application shouldn't care.
131 def queue_get(q):
132 while True:
133 try:
134 return q.get(block=True, timeout=0.1)
135 except Empty:
136 pass
138else:
140 def queue_get(q):
141 return q.get()
144def start_state_from_dask(dsk, cache=None, sortkey=None, keys=None):
145 """Start state from a dask
147 Examples
148 --------
149 >>> inc = lambda x: x + 1
150 >>> add = lambda x, y: x + y
151 >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP
152 >>> from pprint import pprint # doctest: +SKIP
153 >>> pprint(start_state_from_dask(dsk)) # doctest: +SKIP
154 {'cache': {'x': 1, 'y': 2},
155 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
156 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}),
157 'finished': set(),
158 'ready': ['z'],
159 'released': set(),
160 'running': set(),
161 'waiting': {'w': {'z'}},
162 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
163 """
164 if sortkey is None:
165 sortkey = order(dsk).get
166 if cache is None:
167 cache = config.get("cache", None)
168 if cache is None:
169 cache = dict()
170 if keys is None:
171 keys = list(set(dsk) - set(cache))
172 dsk = convert_legacy_graph(dsk, all_keys=set(dsk) | set(cache))
173 stack = list(keys)
174 dependencies = defaultdict(set)
175 dependents = defaultdict(set)
176 waiting = defaultdict(set)
177 waiting_data = defaultdict(set)
178 ready_set = set()
179 seen = set()
180 while stack:
181 key = stack.pop()
182 if key in seen:
183 continue
184 seen.add(key)
185 dependents[key]
186 waiting_data[key]
187 dependencies[key]
188 task = dsk.get(key, None)
189 if task is None:
190 if dependents[key] and not cache.get(key, None):
191 raise ValueError(
192 "Missing dependency {} for dependents {}".format(
193 key, dependents[key]
194 )
195 )
196 continue
197 elif isinstance(task, DataNode):
198 cache[key] = task()
199 dependencies[key]
200 for d in dependents[key]:
201 if d in waiting:
202 waiting[d].remove(key)
203 if not waiting[d]:
204 del waiting[d]
205 ready_set.add(d)
206 else:
207 ready_set.add(d)
208 else:
209 _wait = task.dependencies - set(cache)
210 if not _wait:
211 ready_set.add(key)
212 else:
213 waiting[key] = set(_wait)
214 for dep in task.dependencies:
215 dependencies[key].add(dep)
216 dependents[dep].add(key)
217 waiting_data[dep].add(key)
218 stack.append(dep)
220 ready = sorted(ready_set, key=sortkey, reverse=True)
222 state = {
223 "dependencies": dict(dependencies),
224 "dependents": dict(dependents),
225 "waiting": dict(waiting),
226 "waiting_data": dict(waiting_data),
227 "cache": cache,
228 "ready": ready,
229 "running": set(),
230 "finished": set(),
231 "released": set(),
232 }
234 return state
237"""
238Running tasks
239-------------
241When we execute tasks we both
2431. Perform the actual work of collecting the appropriate data and calling the function
2442. Manage administrative state to coordinate with the scheduler
245"""
248def execute_task(key, task_info, dumps, loads, get_id, pack_exception):
249 """
250 Compute task and handle all administration
252 See Also
253 --------
254 _execute_task : actually execute task
255 """
256 try:
257 task, data = loads(task_info)
258 result = task(data)
259 id = get_id()
260 result = dumps((result, id))
261 failed = False
262 except BaseException as e: # noqa: B036
263 result = pack_exception(e, dumps)
264 failed = True
265 return key, result, failed
268def batch_execute_tasks(it):
269 """
270 Batch computing of multiple tasks with `execute_task`
271 """
272 return [execute_task(*a) for a in it]
275def release_data(key, state, delete=True):
276 """Remove data from temporary storage
278 See Also
279 --------
280 finish_task
281 """
282 if key in state["waiting_data"]:
283 assert not state["waiting_data"][key]
284 del state["waiting_data"][key]
286 state["released"].add(key)
288 if delete:
289 del state["cache"][key]
292def finish_task(
293 dsk, key, state, results, sortkey, delete=True, release_data=release_data
294):
295 """
296 Update execution state after a task finishes
298 Mutates. This should run atomically (with a lock).
299 """
300 for dep in sorted(state["dependents"][key], key=sortkey, reverse=True):
301 s = state["waiting"][dep]
302 s.remove(key)
303 if not s:
304 del state["waiting"][dep]
305 state["ready"].append(dep)
307 for dep in state["dependencies"][key]:
308 if dep in state["waiting_data"]:
309 s = state["waiting_data"][dep]
310 s.remove(key)
311 if not s and dep not in results:
312 release_data(dep, state, delete=delete)
313 elif delete and dep not in results:
314 release_data(dep, state, delete=delete)
316 state["finished"].add(key)
317 state["running"].remove(key)
319 return state
322def nested_get(ind, coll):
323 """Get nested index from collection
325 Examples
326 --------
328 >>> nested_get(1, 'abc')
329 'b'
330 >>> nested_get([1, 0], 'abc')
331 ('b', 'a')
332 >>> nested_get([[1, 0], [0, 1]], 'abc')
333 (('b', 'a'), ('a', 'b'))
334 """
335 if isinstance(ind, list):
336 return tuple(nested_get(i, coll) for i in ind)
337 else:
338 return coll[ind]
341def default_get_id():
342 """Default get_id"""
343 return None
346def default_pack_exception(e, dumps):
347 raise
350def reraise(exc, tb=None):
351 if exc.__traceback__ is not tb:
352 raise exc.with_traceback(tb)
353 raise exc
356def identity(x):
357 """Identity function. Returns x.
359 >>> identity(3)
360 3
361 """
362 return x
365"""
366Task Selection
367--------------
369We often have a choice among many tasks to run next. This choice is both
370cheap and can significantly impact performance.
372We currently select tasks that have recently been made ready. We hope that
373this first-in-first-out policy reduces memory footprint
374"""
376"""
377`get`
378-----
380The main function of the scheduler. Get is the main entry point.
381"""
384def get_async(
385 submit,
386 num_workers,
387 dsk,
388 result,
389 cache=None,
390 get_id=default_get_id,
391 rerun_exceptions_locally=None,
392 pack_exception=default_pack_exception,
393 raise_exception=reraise,
394 callbacks=None,
395 dumps=identity,
396 loads=identity,
397 chunksize=None,
398 **kwargs,
399):
400 """Asynchronous get function
402 This is a general version of various asynchronous schedulers for dask. It
403 takes a ``concurrent.futures.Executor.submit`` function to form a more
404 specific ``get`` method that walks through the dask array with parallel
405 workers, avoiding repeat computation and minimizing memory use.
407 Parameters
408 ----------
409 submit : function
410 A ``concurrent.futures.Executor.submit`` function
411 num_workers : int
412 The number of workers that task submissions can be spread over
413 dsk : dict
414 A dask dictionary specifying a workflow
415 result : key or list of keys
416 Keys corresponding to desired data
417 cache : dict-like, optional
418 Temporary storage of results
419 get_id : callable, optional
420 Function to return the worker id, takes no arguments. Examples are
421 `threading.current_thread` and `multiprocessing.current_process`.
422 rerun_exceptions_locally : bool, optional
423 Whether to rerun failing tasks in local process to enable debugging
424 (False by default)
425 pack_exception : callable, optional
426 Function to take an exception and ``dumps`` method, and return a
427 serialized tuple of ``(exception, traceback)`` to send back to the
428 scheduler. Default is to just raise the exception.
429 raise_exception : callable, optional
430 Function that takes an exception and a traceback, and raises an error.
431 callbacks : tuple or list of tuples, optional
432 Callbacks are passed in as tuples of length 5. Multiple sets of
433 callbacks may be passed in as a list of tuples. For more information,
434 see the dask.diagnostics documentation.
435 dumps: callable, optional
436 Function to serialize task data and results to communicate between
437 worker and parent. Defaults to identity.
438 loads: callable, optional
439 Inverse function of `dumps`. Defaults to identity.
440 chunksize: int, optional
441 Size of chunks to use when dispatching work. Defaults to 1.
442 If -1, will be computed to evenly divide ready work across workers.
444 See Also
445 --------
446 threaded.get
447 """
448 chunksize = chunksize or config.get("chunksize", 1)
450 queue = Queue()
452 if isinstance(result, list):
453 result_flat = set(flatten(result))
454 else:
455 result_flat = {result}
456 results = set(result_flat)
458 if not isinstance(dsk, Mapping):
459 dsk = dsk.__dask_graph__()
460 dsk = convert_legacy_graph(dsk)
461 with local_callbacks(callbacks) as callbacks:
462 _, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks)
463 started_cbs = []
464 succeeded = False
465 # if start_state_from_dask fails, we will have something
466 # to pass to the final block.
467 state = {}
468 try:
469 for cb in callbacks:
470 if cb[0]:
471 cb[0](dsk)
472 started_cbs.append(cb)
474 keyorder = order(dsk)
476 state = start_state_from_dask(
477 dsk, keys=results, cache=cache, sortkey=keyorder.get
478 )
480 for _, start_state, _, _, _ in callbacks:
481 if start_state:
482 start_state(dsk, state)
484 if rerun_exceptions_locally is None:
485 rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
487 if state["waiting"] and not state["ready"]:
488 raise ValueError("Found no accessible jobs in dask")
490 def fire_tasks(chunksize):
491 """Fire off a task to the thread pool"""
492 # Determine chunksize and/or number of tasks to submit
493 nready = len(state["ready"])
494 if chunksize == -1:
495 ntasks = nready
496 chunksize = -(ntasks // -num_workers)
497 else:
498 used_workers = -(len(state["running"]) // -chunksize)
499 avail_workers = max(num_workers - used_workers, 0)
500 ntasks = min(nready, chunksize * avail_workers)
502 # Prep all ready tasks for submission
503 args = []
504 for _ in range(ntasks):
505 # Get the next task to compute (most recently added)
506 key = state["ready"].pop()
507 # Notify task is running
508 state["running"].add(key)
509 for f in pretask_cbs:
510 f(key, dsk, state)
512 # Prep args to send
513 data = {
514 dep: state["cache"][dep] for dep in state["dependencies"][key]
515 }
516 args.append(
517 (
518 key,
519 dumps((dsk[key], data)),
520 dumps,
521 loads,
522 get_id,
523 pack_exception,
524 )
525 )
527 # Batch submit
528 for i in range(-(len(args) // -chunksize)):
529 each_args = args[i * chunksize : (i + 1) * chunksize]
530 if not each_args:
531 break
532 fut = submit(batch_execute_tasks, each_args)
533 fut.add_done_callback(queue.put)
535 # Main loop, wait on tasks to finish, insert new ones
536 while state["waiting"] or state["ready"] or state["running"]:
537 fire_tasks(chunksize)
538 for key, res_info, failed in queue_get(queue).result():
539 if failed:
540 exc, tb = loads(res_info)
541 if rerun_exceptions_locally:
542 data = {
543 dep: state["cache"][dep]
544 for dep in get_dependencies(dsk, key)
545 }
546 task = dsk[key]
547 task(data) # Re-execute locally
548 else:
549 raise_exception(exc, tb)
550 res, worker_id = loads(res_info)
551 state["cache"][key] = res
552 finish_task(dsk, key, state, results, keyorder.get)
553 for f in posttask_cbs:
554 f(key, res, dsk, state, worker_id)
556 succeeded = True
558 finally:
559 for _, _, _, _, finish in started_cbs:
560 if finish:
561 finish(dsk, state, not succeeded)
563 return nested_get(result, state["cache"])
566""" Synchronous concrete version of get_async
568Usually we supply a ``concurrent.futures.Executor``. Here we provide a
569sequential one. This is useful for debugging and for code dominated by the
570GIL
571"""
574class SynchronousExecutor(Executor):
575 _max_workers = 1
577 def submit(self, fn, *args, **kwargs):
578 fut = Future()
579 try:
580 fut.set_result(fn(*args, **kwargs))
581 except BaseException as e: # noqa: B036
582 fut.set_exception(e)
583 return fut
586synchronous_executor = SynchronousExecutor()
589def get_sync(dsk: Mapping, keys: Sequence[Key] | Key, **kwargs):
590 """A naive synchronous version of get_async
592 Can be useful for debugging.
593 """
594 kwargs.pop("num_workers", None) # if num_workers present, remove it
595 return get_async(
596 synchronous_executor.submit,
597 synchronous_executor._max_workers,
598 dsk,
599 keys,
600 **kwargs,
601 )
604""" Adaptor for ``multiprocessing.Pool`` instances
606Usually we supply a ``concurrent.futures.Executor``. Here we provide a wrapper
607class for ``multiprocessing.Pool`` instances so we can treat them like
608``concurrent.futures.Executor`` instances instead.
610This is mainly useful for legacy use cases or users that prefer
611``multiprocessing.Pool``.
612"""
615class MultiprocessingPoolExecutor(Executor):
616 def __init__(self, pool):
617 self.pool = pool
618 self._max_workers = len(pool._pool)
620 def submit(self, fn, *args, **kwargs):
621 return submit_apply_async(self.pool.apply_async, fn, *args, **kwargs)
624def submit_apply_async(apply_async, fn, *args, **kwargs):
625 fut = Future()
626 apply_async(fn, args, kwargs, fut.set_result, fut.set_exception)
627 return fut
630def get_apply_async(apply_async, num_workers, *args, **kwargs):
631 return get_async(
632 partial(submit_apply_async, apply_async), num_workers, *args, **kwargs
633 )
636def sortkey(item):
637 """Sorting key function that is robust to different types
639 Both strings and tuples are common key types in dask graphs.
640 However In Python 3 one can not compare strings with tuples directly.
641 This function maps many types to a form where they can be compared
643 Examples
644 --------
645 >>> sortkey('Hello')
646 ('str', 'Hello')
648 >>> sortkey(('x', 1))
649 ('tuple', ('x', 1))
650 """
651 return (type(item).__name__, item)