Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/local.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Asynchronous Shared-Memory Scheduler for Dask Graphs.
4This scheduler coordinates several workers to execute tasks in a dask graph in
5parallel. It depends on a ``concurrent.futures.Executor``
6and a corresponding Queue for worker-to-scheduler communication.
8It tries to execute tasks in an order which maintains a small memory footprint
9throughout execution. It does this by running tasks that allow us to release
10data resources.
13Task Selection Policy
14=====================
16When we complete a task we add more data in to our set of available data; this
17new data makes new tasks available. We preferentially choose tasks that were
18just made available in a last-in-first-out fashion. We implement this as a
19simple stack. This results in more depth-first rather than breadth first
20behavior which encourages us to process batches of data to completion before
21starting in on new data when possible.
23When the addition of new data readies multiple tasks simultaneously we add
24tasks to the stack in sorted order so that tasks with greater keynames are run
25first. This can be handy to break ties in a predictable fashion.
28State
29=====
31Many functions pass around a ``state`` variable that holds the current state of
32the computation. This variable consists of several other dictionaries and
33sets, explained below.
35Constant state
36--------------
381. dependencies: {x: [a, b ,c]} a,b,c, must be run before x
392. dependents: {a: [x, y]} a must run before x or y
41Changing state
42--------------
44### Data
461. cache: available concrete data. {key: actual-data}
472. released: data that we've seen, used, and released because it is no longer
48 needed
50### Jobs
521. ready: A fifo stack of ready-to-run tasks
532. running: A set of tasks currently in execution
543. finished: A set of finished tasks
554. waiting: which tasks are still waiting on others :: {key: {keys}}
56 Real-time equivalent of dependencies
575. waiting_data: available data to yet-to-be-run-tasks :: {key: {keys}}
58 Real-time equivalent of dependents
61Examples
62--------
64>>> import pprint # doctest: +SKIP
65>>> inc = lambda x: x + 1
66>>> add = lambda x, y: x + y
67>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP
68>>> pprint.pprint(start_state_from_dask(dsk)) # doctest: +SKIP
69{'cache': {'x': 1, 'y': 2},
70 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
71 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}),
72 'finished': set(),
73 'ready': ['z'],
74 'released': set(),
75 'running': set(),
76 'waiting': {'w': {'z'}},
77 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
79Optimizations
80=============
82We build this scheduler with out-of-core array operations in mind. To this end
83we have encoded some particular optimizations.
85Compute to release data
86-----------------------
88When we choose a new task to execute we often have many options. Policies at
89this stage are cheap and can significantly impact performance. One could
90imagine policies that expose parallelism, drive towards a particular output,
91etc..
93Our current policy is to run tasks that were most recently made available.
96Inlining computations
97---------------------
99We hold on to intermediate computations either in memory or on disk.
101For very cheap computations that may emit new copies of the data, like
102``np.transpose`` or possibly even ``x + 1`` we choose not to store these as
103separate pieces of data / tasks. Instead we combine them with the computations
104that require them. This may result in repeated computation but saves
105significantly on space and computation complexity.
107See the function ``inline_functions`` for more information.
108"""
110from __future__ import annotations
112import os
113from collections import defaultdict
114from collections.abc import Mapping, Sequence
115from concurrent.futures import Executor, Future
116from functools import partial
117from queue import Empty, Queue
119from dask import config
120from dask._task_spec import DataNode, convert_legacy_graph
121from dask.callbacks import local_callbacks, unpack_callbacks
122from dask.core import flatten, get_dependencies
123from dask.order import order
124from dask.typing import Key
126if os.name == "nt":
127 # Python 3 windows Queue.get doesn't handle interrupts properly. To
128 # workaround this we poll at a sufficiently large interval that it
129 # shouldn't affect performance, but small enough that users trying to kill
130 # an application shouldn't care.
131 def queue_get(q):
132 while True:
133 try:
134 return q.get(block=True, timeout=0.1)
135 except Empty:
136 pass
138else:
140 def queue_get(q):
141 return q.get()
144def start_state_from_dask(dsk, cache=None, sortkey=None, keys=None):
145 """Start state from a dask
147 Examples
148 --------
149 >>> inc = lambda x: x + 1
150 >>> add = lambda x, y: x + y
151 >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} # doctest: +SKIP
152 >>> from pprint import pprint # doctest: +SKIP
153 >>> pprint(start_state_from_dask(dsk)) # doctest: +SKIP
154 {'cache': {'x': 1, 'y': 2},
155 'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
156 'dependents': defaultdict(None, {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}}),
157 'finished': set(),
158 'ready': ['z'],
159 'released': set(),
160 'running': set(),
161 'waiting': {'w': {'z'}},
162 'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
163 """
164 if sortkey is None:
165 sortkey = order(dsk).get
166 if cache is None:
167 cache = config.get("cache", None)
168 if cache is None:
169 cache = dict()
170 if keys is None:
171 keys = list(set(dsk) - set(cache))
172 dsk = convert_legacy_graph(dsk, all_keys=set(dsk) | set(cache))
173 stack = list(keys)
174 dependencies = defaultdict(set)
175 dependents = defaultdict(set)
176 waiting = defaultdict(set)
177 waiting_data = defaultdict(set)
178 ready_set = set()
179 seen = set()
180 while stack:
181 key = stack.pop()
182 if key in seen:
183 continue
184 seen.add(key)
185 dependents[key]
186 waiting_data[key]
187 dependencies[key]
188 task = dsk.get(key, None)
189 if task is None:
190 if dependents[key] and not cache.get(key):
191 raise ValueError(
192 f"Missing dependency {key} for dependents {dependents[key]}"
193 )
194 continue
195 elif isinstance(task, DataNode):
196 cache[key] = task()
197 dependencies[key]
198 for d in dependents[key]:
199 if d in waiting:
200 waiting[d].remove(key)
201 if not waiting[d]:
202 del waiting[d]
203 ready_set.add(d)
204 else:
205 ready_set.add(d)
206 else:
207 _wait = task.dependencies - set(cache)
208 if not _wait:
209 ready_set.add(key)
210 else:
211 waiting[key] = set(_wait)
212 for dep in task.dependencies:
213 dependencies[key].add(dep)
214 dependents[dep].add(key)
215 waiting_data[dep].add(key)
216 stack.append(dep)
218 ready = sorted(ready_set, key=sortkey, reverse=True)
220 state = {
221 "dependencies": dict(dependencies),
222 "dependents": dict(dependents),
223 "waiting": dict(waiting),
224 "waiting_data": dict(waiting_data),
225 "cache": cache,
226 "ready": ready,
227 "running": set(),
228 "finished": set(),
229 "released": set(),
230 }
232 return state
235"""
236Running tasks
237-------------
239When we execute tasks we both
2411. Perform the actual work of collecting the appropriate data and calling the function
2422. Manage administrative state to coordinate with the scheduler
243"""
246def execute_task(key, task_info, dumps, loads, get_id, pack_exception):
247 """
248 Compute task and handle all administration
250 See Also
251 --------
252 _execute_task : actually execute task
253 """
254 try:
255 task, data = loads(task_info)
256 result = task(data)
257 id = get_id()
258 result = dumps((result, id))
259 failed = False
260 except BaseException as e:
261 result = pack_exception(e, dumps)
262 failed = True
263 return key, result, failed
266def batch_execute_tasks(it):
267 """
268 Batch computing of multiple tasks with `execute_task`
269 """
270 return [execute_task(*a) for a in it]
273def release_data(key, state, delete=True):
274 """Remove data from temporary storage
276 See Also
277 --------
278 finish_task
279 """
280 if key in state["waiting_data"]:
281 assert not state["waiting_data"][key]
282 del state["waiting_data"][key]
284 state["released"].add(key)
286 if delete:
287 del state["cache"][key]
290def finish_task(
291 dsk, key, state, results, sortkey, delete=True, release_data=release_data
292):
293 """
294 Update execution state after a task finishes
296 Mutates. This should run atomically (with a lock).
297 """
298 for dep in sorted(state["dependents"][key], key=sortkey, reverse=True):
299 s = state["waiting"][dep]
300 s.remove(key)
301 if not s:
302 del state["waiting"][dep]
303 state["ready"].append(dep)
305 for dep in state["dependencies"][key]:
306 if dep in state["waiting_data"]:
307 s = state["waiting_data"][dep]
308 s.remove(key)
309 if not s and dep not in results:
310 release_data(dep, state, delete=delete)
311 elif delete and dep not in results:
312 release_data(dep, state, delete=delete)
314 state["finished"].add(key)
315 state["running"].remove(key)
317 return state
320def nested_get(ind, coll):
321 """Get nested index from collection
323 Examples
324 --------
326 >>> nested_get(1, 'abc')
327 'b'
328 >>> nested_get([1, 0], 'abc')
329 ('b', 'a')
330 >>> nested_get([[1, 0], [0, 1]], 'abc')
331 (('b', 'a'), ('a', 'b'))
332 """
333 if isinstance(ind, list):
334 return tuple(nested_get(i, coll) for i in ind)
335 else:
336 return coll[ind]
339def default_get_id():
340 """Default get_id"""
341 return None
344def default_pack_exception(e, dumps):
345 raise e
348def reraise(exc, tb=None):
349 if exc.__traceback__ is not tb:
350 raise exc.with_traceback(tb)
351 raise exc
354def identity(x):
355 """Identity function. Returns x.
357 >>> identity(3)
358 3
359 """
360 return x
363"""
364Task Selection
365--------------
367We often have a choice among many tasks to run next. This choice is both
368cheap and can significantly impact performance.
370We currently select tasks that have recently been made ready. We hope that
371this first-in-first-out policy reduces memory footprint
372"""
374"""
375`get`
376-----
378The main function of the scheduler. Get is the main entry point.
379"""
382def get_async(
383 submit,
384 num_workers,
385 dsk,
386 result,
387 cache=None,
388 get_id=default_get_id,
389 rerun_exceptions_locally=None,
390 pack_exception=default_pack_exception,
391 raise_exception=reraise,
392 callbacks=None,
393 dumps=identity,
394 loads=identity,
395 chunksize=None,
396 **kwargs,
397):
398 """Asynchronous get function
400 This is a general version of various asynchronous schedulers for dask. It
401 takes a ``concurrent.futures.Executor.submit`` function to form a more
402 specific ``get`` method that walks through the dask array with parallel
403 workers, avoiding repeat computation and minimizing memory use.
405 Parameters
406 ----------
407 submit : function
408 A ``concurrent.futures.Executor.submit`` function
409 num_workers : int
410 The number of workers that task submissions can be spread over
411 dsk : dict
412 A dask dictionary specifying a workflow
413 result : key or list of keys
414 Keys corresponding to desired data
415 cache : dict-like, optional
416 Temporary storage of results
417 get_id : callable, optional
418 Function to return the worker id, takes no arguments. Examples are
419 `threading.current_thread` and `multiprocessing.current_process`.
420 rerun_exceptions_locally : bool, optional
421 Whether to rerun failing tasks in local process to enable debugging
422 (False by default)
423 pack_exception : callable, optional
424 Function to take an exception and ``dumps`` method, and return a
425 serialized tuple of ``(exception, traceback)`` to send back to the
426 scheduler. Default is to just raise the exception.
427 raise_exception : callable, optional
428 Function that takes an exception and a traceback, and raises an error.
429 callbacks : tuple or list of tuples, optional
430 Callbacks are passed in as tuples of length 5. Multiple sets of
431 callbacks may be passed in as a list of tuples. For more information,
432 see the dask.diagnostics documentation.
433 dumps: callable, optional
434 Function to serialize task data and results to communicate between
435 worker and parent. Defaults to identity.
436 loads: callable, optional
437 Inverse function of `dumps`. Defaults to identity.
438 chunksize: int, optional
439 Size of chunks to use when dispatching work. Defaults to 1.
440 If -1, will be computed to evenly divide ready work across workers.
442 See Also
443 --------
444 threaded.get
445 """
446 chunksize = chunksize or config.get("chunksize", 1)
448 queue = Queue()
450 if isinstance(result, list):
451 result_flat = set(flatten(result))
452 else:
453 result_flat = {result}
454 results = set(result_flat)
456 if not isinstance(dsk, Mapping):
457 dsk = dsk.__dask_graph__()
458 dsk = convert_legacy_graph(dsk)
459 with local_callbacks(callbacks) as callbacks:
460 _, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks)
461 started_cbs = []
462 succeeded = False
463 # if start_state_from_dask fails, we will have something
464 # to pass to the final block.
465 state = {}
466 try:
467 for cb in callbacks:
468 if cb[0]:
469 cb[0](dsk)
470 started_cbs.append(cb)
472 keyorder = order(dsk)
474 state = start_state_from_dask(
475 dsk, keys=results, cache=cache, sortkey=keyorder.get
476 )
478 for _, start_state, _, _, _ in callbacks:
479 if start_state:
480 start_state(dsk, state)
482 if rerun_exceptions_locally is None:
483 rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
485 if state["waiting"] and not state["ready"]:
486 raise ValueError("Found no accessible jobs in dask")
488 def fire_tasks(chunksize):
489 """Fire off a task to the thread pool"""
490 # Determine chunksize and/or number of tasks to submit
491 nready = len(state["ready"])
492 if chunksize == -1:
493 ntasks = nready
494 chunksize = -(ntasks // -num_workers)
495 else:
496 used_workers = -(len(state["running"]) // -chunksize)
497 avail_workers = max(num_workers - used_workers, 0)
498 ntasks = min(nready, chunksize * avail_workers)
500 # Prep all ready tasks for submission
501 args = []
502 for _ in range(ntasks):
503 # Get the next task to compute (most recently added)
504 key = state["ready"].pop()
505 # Notify task is running
506 state["running"].add(key)
507 for f in pretask_cbs:
508 f(key, dsk, state)
510 # Prep args to send
511 data = {
512 dep: state["cache"][dep] for dep in state["dependencies"][key]
513 }
514 args.append(
515 (
516 key,
517 dumps((dsk[key], data)),
518 dumps,
519 loads,
520 get_id,
521 pack_exception,
522 )
523 )
525 # Batch submit
526 for i in range(-(len(args) // -chunksize)):
527 each_args = args[i * chunksize : (i + 1) * chunksize]
528 if not each_args:
529 break
530 fut = submit(batch_execute_tasks, each_args)
531 fut.add_done_callback(queue.put)
533 # Main loop, wait on tasks to finish, insert new ones
534 while state["waiting"] or state["ready"] or state["running"]:
535 fire_tasks(chunksize)
536 for key, res_info, failed in queue_get(queue).result():
537 if failed:
538 exc, tb = loads(res_info)
539 if rerun_exceptions_locally:
540 data = {
541 dep: state["cache"][dep]
542 for dep in get_dependencies(dsk, key)
543 }
544 task = dsk[key]
545 task(data) # Re-execute locally
546 else:
547 raise_exception(exc, tb)
548 res, worker_id = loads(res_info)
549 state["cache"][key] = res
550 finish_task(dsk, key, state, results, keyorder.get)
551 for f in posttask_cbs:
552 f(key, res, dsk, state, worker_id)
554 succeeded = True
556 finally:
557 for _, _, _, _, finish in started_cbs:
558 if finish:
559 finish(dsk, state, not succeeded)
561 return nested_get(result, state["cache"])
564""" Synchronous concrete version of get_async
566Usually we supply a ``concurrent.futures.Executor``. Here we provide a
567sequential one. This is useful for debugging and for code dominated by the
568GIL
569"""
572class SynchronousExecutor(Executor):
573 _max_workers = 1
575 def submit(self, fn, *args, **kwargs):
576 fut = Future()
577 try:
578 fut.set_result(fn(*args, **kwargs))
579 except BaseException as e:
580 fut.set_exception(e)
581 return fut
584synchronous_executor = SynchronousExecutor()
587def get_sync(dsk: Mapping, keys: Sequence[Key] | Key, **kwargs):
588 """A naive synchronous version of get_async
590 Can be useful for debugging.
591 """
592 kwargs.pop("num_workers", None) # if num_workers present, remove it
593 return get_async(
594 synchronous_executor.submit,
595 synchronous_executor._max_workers,
596 dsk,
597 keys,
598 **kwargs,
599 )
602""" Adaptor for ``multiprocessing.Pool`` instances
604Usually we supply a ``concurrent.futures.Executor``. Here we provide a wrapper
605class for ``multiprocessing.Pool`` instances so we can treat them like
606``concurrent.futures.Executor`` instances instead.
608This is mainly useful for legacy use cases or users that prefer
609``multiprocessing.Pool``.
610"""
613class MultiprocessingPoolExecutor(Executor):
614 def __init__(self, pool):
615 self.pool = pool
616 self._max_workers = len(pool._pool)
618 def submit(self, fn, *args, **kwargs):
619 return submit_apply_async(self.pool.apply_async, fn, *args, **kwargs)
622def submit_apply_async(apply_async, fn, *args, **kwargs):
623 fut = Future()
624 apply_async(fn, args, kwargs, fut.set_result, fut.set_exception)
625 return fut
628def get_apply_async(apply_async, num_workers, *args, **kwargs):
629 return get_async(
630 partial(submit_apply_async, apply_async), num_workers, *args, **kwargs
631 )
634def sortkey(item):
635 """Sorting key function that is robust to different types
637 Both strings and tuples are common key types in dask graphs.
638 However In Python 3 one can not compare strings with tuples directly.
639 This function maps many types to a form where they can be compared
641 Examples
642 --------
643 >>> sortkey('Hello')
644 ('str', 'Hello')
646 >>> sortkey(('x', 1))
647 ('tuple', ('x', 1))
648 """
649 return (type(item).__name__, item)